ObjectHashAggregateExec Physical Operator¶
ObjectHashAggregateExec
is an aggregate unary physical operator for object aggregation.
ObjectHashAggregateExec
uses ObjectAggregationIterator for aggregation (one per partition).
Creating Instance¶
ObjectHashAggregateExec
takes the following to be created:
- Required Child Distribution Expressions
- isStreaming flag
- Number of Shuffle Partitions (always
None
) - Grouping NamedExpressions
- AggregateExpressions
- Aggregate Attributes
- Initial Input Buffer Offset
- Result NamedExpressions
- Child Physical Operator
ObjectHashAggregateExec
is created when:
AggUtils
is requested to create a physical operator for aggregation
isStreaming Flag¶
ObjectHashAggregateExec
is given isStreaming
flag when created.
The isStreaming
is always false
but when AggUtils
is requested to create a streaming aggregate physical operator.
Performance Metrics¶
time in aggregation build¶
Time to execute a single partition
number of output rows¶
1
when there is neither input rows in a partition nor grouping expressions
Used to create an ObjectAggregationIterator
number of sort fallback tasks¶
Number of tasks that crossed spark.sql.objectHashAggregate.sortBased.fallbackThreshold
Used to create an ObjectAggregationIterator
spill size¶
Used to create a ObjectAggregationIterator
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan abstraction.
doExecute
requests the child physical operator to execute (and generate an RDD[InternalRow]
) that is then mapPartitionsWithIndexInternal to process partitions.
Note
doExecute
adds a new MapPartitionsRDD
(Spark Core) to the RDD lineage.
Processing Partition¶
While processing a partition, mapPartitionsWithIndexInternal
branches off based on availability of input rows and the grouping expressions:
- No input rows but there are grouping expressions
- No input rows and no grouping expressions
- Input rows are available (regardless of the grouping expressions)
No Input Rows with Grouping Expression¶
For no input records (in a partition) and non-empty grouping expressions, doExecute
returns an empty Iterator
.
No Input Rows and No Grouping Expression¶
Otherwise, doExecute
creates an ObjectAggregationIterator.
For no input records (in a partition) and no grouping expressions, doExecute
increments the numOutputRows metric (to be 1
) and requests the ObjectAggregationIterator
for outputForEmptyGroupingKeyWithoutInput (that is the only output row).
Input Rows Available¶
Otherwise, doExecute
returns the ObjectAggregationIterator
.
Selection Requirements¶
supportsAggregate(
aggregateExpressions: Seq[AggregateExpression]): Boolean
supportsAggregate
is enabled (true
) when there is a TypedImperativeAggregate aggregate function among the AggregateFunctions of the given AggregateExpressions.
supportsAggregate
is used when:
AggUtils
utility is used to select an aggregate physical operator
Demo¶
ObjectHashAggregateExec
is selected when spark.sql.execution.useObjectHashAggregateExec configuration property is enabled (and HashAggregateExec could not be used).
assert(spark.sessionState.conf.useObjectHashAggregation)
Use immutable data types for aggregateBufferAttributes
.
val dataset = Seq(
(0, Seq.empty[Int]),
(1, Seq(1, 1)),
(2, Seq(2, 2))).toDF("id", "nums")
import org.apache.spark.sql.functions.size
val q = dataset.
groupBy(size($"nums") as "group"). // <-- size over array
agg(collect_list("id") as "ids")
scala> q.explain
== Physical Plan ==
ObjectHashAggregate(keys=[size(nums#8, true)#18], functions=[collect_list(id#7, 0, 0)])
+- Exchange hashpartitioning(size(nums#8, true)#18, 200), ENSURE_REQUIREMENTS, [id=#10]
+- ObjectHashAggregate(keys=[size(nums#8, true) AS size(nums#8, true)#18], functions=[partial_collect_list(id#7, 0, 0)])
+- LocalTableScan [id#7, nums#8]
scala> println(q.queryExecution.sparkPlan.numberedTreeString)
00 ObjectHashAggregate(keys=[size(nums#8, true)#18], functions=[collect_list(id#7, 0, 0)], output=[group#11, ids#15])
01 +- ObjectHashAggregate(keys=[size(nums#8, true) AS size(nums#8, true)#18], functions=[partial_collect_list(id#7, 0, 0)], output=[size(nums#8, true)#18, buf#20])
02 +- LocalTableScan [id#7, nums#8]
Going low level. Watch your steps :)
// copied from HashAggregateExec as it is the preferred aggreate physical operator
// and HashAggregateExec is checked first
// When the check fails, ObjectHashAggregateExec is then checked
import q.queryExecution.optimizedPlan
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
val aggLog = optimizedPlan.asInstanceOf[Aggregate]
import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
// groupingExpressions, aggregateExpressions, resultExpressions, child
val (_, aggregateExpressions: Seq[AggregateExpression], _, _) = PhysicalAggregation.unapply(aggLog).get
val aggregateBufferAttributes =
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
One of the reasons why ObjectHashAggregateExec
was selected is that HashAggregateExec
did not meet the requirements.
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
assert(HashAggregateExec.supportsAggregate(aggregateBufferAttributes) == false)
// collect_list aggregate function uses CollectList TypedImperativeAggregate under the covers
import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
assert(ObjectHashAggregateExec.supportsAggregate(aggregateExpressions))
val aggExec = q.queryExecution.sparkPlan.children.head.asInstanceOf[ObjectHashAggregateExec]
scala> println(aggExec.aggregateExpressions.head.numberedTreeString)
00 partial_collect_list(id#7, 0, 0)
01 +- collect_list(id#7, 0, 0)
02 +- id#7: int