ObjectHashAggregateExec Physical Operator¶
ObjectHashAggregateExec is an aggregate unary physical operator for object aggregation.
ObjectHashAggregateExec uses ObjectAggregationIterator for aggregation (one per partition).

Creating Instance¶
ObjectHashAggregateExec takes the following to be created:
- Required Child Distribution Expressions
- isStreaming flag
- Number of Shuffle Partitions (always
None) - Grouping NamedExpressions
- AggregateExpressions
- Aggregate Attributes
- Initial Input Buffer Offset
- Result NamedExpressions
- Child Physical Operator
ObjectHashAggregateExec is created when:
AggUtilsis requested to create a physical operator for aggregation
isStreaming Flag¶
ObjectHashAggregateExec is given isStreaming flag when created.
The isStreaming is always false but when AggUtils is requested to create a streaming aggregate physical operator.
Performance Metrics¶
time in aggregation build¶
Time to execute a single partition
number of output rows¶
1when there is neither input rows in a partition nor grouping expressions
Used to create an ObjectAggregationIterator
number of sort fallback tasks¶
Number of tasks that crossed spark.sql.objectHashAggregate.sortBased.fallbackThreshold
Used to create an ObjectAggregationIterator
spill size¶
Used to create a ObjectAggregationIterator
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute is part of the SparkPlan abstraction.
doExecute requests the child physical operator to execute (and generate an RDD[InternalRow]) that is then mapPartitionsWithIndexInternal to process partitions.
Note
doExecute adds a new MapPartitionsRDD (Spark Core) to the RDD lineage.
Processing Partition¶
While processing a partition, mapPartitionsWithIndexInternal branches off based on availability of input rows and the grouping expressions:
- No input rows but there are grouping expressions
- No input rows and no grouping expressions
- Input rows are available (regardless of the grouping expressions)
No Input Rows with Grouping Expression¶
For no input records (in a partition) and non-empty grouping expressions, doExecute returns an empty Iterator.
No Input Rows and No Grouping Expression¶
Otherwise, doExecute creates an ObjectAggregationIterator.
For no input records (in a partition) and no grouping expressions, doExecute increments the numOutputRows metric (to be 1) and requests the ObjectAggregationIterator for outputForEmptyGroupingKeyWithoutInput (that is the only output row).
Input Rows Available¶
Otherwise, doExecute returns the ObjectAggregationIterator.
Selection Requirements¶
supportsAggregate(
aggregateExpressions: Seq[AggregateExpression]): Boolean
supportsAggregate is enabled (true) when there is a TypedImperativeAggregate aggregate function among the AggregateFunctions of the given AggregateExpressions.
supportsAggregate is used when:
AggUtilsutility is used to select an aggregate physical operator
Demo¶
ObjectHashAggregateExec is selected when spark.sql.execution.useObjectHashAggregateExec configuration property is enabled (and HashAggregateExec could not be used).
assert(spark.sessionState.conf.useObjectHashAggregation)
Use immutable data types for aggregateBufferAttributes.
val dataset = Seq(
(0, Seq.empty[Int]),
(1, Seq(1, 1)),
(2, Seq(2, 2))).toDF("id", "nums")
import org.apache.spark.sql.functions.size
val q = dataset.
groupBy(size($"nums") as "group"). // <-- size over array
agg(collect_list("id") as "ids")
scala> q.explain
== Physical Plan ==
ObjectHashAggregate(keys=[size(nums#8, true)#18], functions=[collect_list(id#7, 0, 0)])
+- Exchange hashpartitioning(size(nums#8, true)#18, 200), ENSURE_REQUIREMENTS, [id=#10]
+- ObjectHashAggregate(keys=[size(nums#8, true) AS size(nums#8, true)#18], functions=[partial_collect_list(id#7, 0, 0)])
+- LocalTableScan [id#7, nums#8]
scala> println(q.queryExecution.sparkPlan.numberedTreeString)
00 ObjectHashAggregate(keys=[size(nums#8, true)#18], functions=[collect_list(id#7, 0, 0)], output=[group#11, ids#15])
01 +- ObjectHashAggregate(keys=[size(nums#8, true) AS size(nums#8, true)#18], functions=[partial_collect_list(id#7, 0, 0)], output=[size(nums#8, true)#18, buf#20])
02 +- LocalTableScan [id#7, nums#8]
Going low level. Watch your steps :)
// copied from HashAggregateExec as it is the preferred aggreate physical operator
// and HashAggregateExec is checked first
// When the check fails, ObjectHashAggregateExec is then checked
import q.queryExecution.optimizedPlan
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
val aggLog = optimizedPlan.asInstanceOf[Aggregate]
import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
// groupingExpressions, aggregateExpressions, resultExpressions, child
val (_, aggregateExpressions: Seq[AggregateExpression], _, _) = PhysicalAggregation.unapply(aggLog).get
val aggregateBufferAttributes =
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
One of the reasons why ObjectHashAggregateExec was selected is that HashAggregateExec did not meet the requirements.
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
assert(HashAggregateExec.supportsAggregate(aggregateBufferAttributes) == false)
// collect_list aggregate function uses CollectList TypedImperativeAggregate under the covers
import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
assert(ObjectHashAggregateExec.supportsAggregate(aggregateExpressions))
val aggExec = q.queryExecution.sparkPlan.children.head.asInstanceOf[ObjectHashAggregateExec]
scala> println(aggExec.aggregateExpressions.head.numberedTreeString)
00 partial_collect_list(id#7, 0, 0)
01 +- collect_list(id#7, 0, 0)
02 +- id#7: int