Skip to content

ObjectHashAggregateExec Physical Operator

ObjectHashAggregateExec is an aggregate unary physical operator for object aggregation.

ObjectHashAggregateExec uses ObjectAggregationIterator for aggregation (one per partition).

ObjectHashAggregateExec in web UI (Details for Query)

Creating Instance

ObjectHashAggregateExec takes the following to be created:

ObjectHashAggregateExec is created when:

isStreaming Flag

ObjectHashAggregateExec is given isStreaming flag when created.

The isStreaming is always false but when AggUtils is requested to create a streaming aggregate physical operator.

Performance Metrics

time in aggregation build

Time to execute a single partition

number of output rows

Used to create an ObjectAggregationIterator

number of sort fallback tasks

Number of tasks that crossed spark.sql.objectHashAggregate.sortBased.fallbackThreshold

Used to create an ObjectAggregationIterator

spill size

Used to create a ObjectAggregationIterator

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.


doExecute requests the child physical operator to execute (and generate an RDD[InternalRow]) that is then mapPartitionsWithIndexInternal to process partitions.

Note

doExecute adds a new MapPartitionsRDD (Spark Core) to the RDD lineage.

Processing Partition

While processing a partition, mapPartitionsWithIndexInternal branches off based on availability of input rows and the grouping expressions:

  1. No input rows but there are grouping expressions
  2. No input rows and no grouping expressions
  3. Input rows are available (regardless of the grouping expressions)

No Input Rows with Grouping Expression

For no input records (in a partition) and non-empty grouping expressions, doExecute returns an empty Iterator.

No Input Rows and No Grouping Expression

Otherwise, doExecute creates an ObjectAggregationIterator.

For no input records (in a partition) and no grouping expressions, doExecute increments the numOutputRows metric (to be 1) and requests the ObjectAggregationIterator for outputForEmptyGroupingKeyWithoutInput (that is the only output row).

Input Rows Available

Otherwise, doExecute returns the ObjectAggregationIterator.

Selection Requirements

supportsAggregate(
  aggregateExpressions: Seq[AggregateExpression]): Boolean

supportsAggregate is enabled (true) when there is a TypedImperativeAggregate aggregate function among the AggregateFunctions of the given AggregateExpressions.


supportsAggregate is used when:

Demo

ObjectHashAggregateExec is selected when spark.sql.execution.useObjectHashAggregateExec configuration property is enabled (and HashAggregateExec could not be used).

assert(spark.sessionState.conf.useObjectHashAggregation)

Use immutable data types for aggregateBufferAttributes.

val dataset = Seq(
  (0, Seq.empty[Int]),
  (1, Seq(1, 1)),
  (2, Seq(2, 2))).toDF("id", "nums")
import org.apache.spark.sql.functions.size
val q = dataset.
  groupBy(size($"nums") as "group"). // <-- size over array
  agg(collect_list("id") as "ids")
scala> q.explain
== Physical Plan ==
ObjectHashAggregate(keys=[size(nums#8, true)#18], functions=[collect_list(id#7, 0, 0)])
+- Exchange hashpartitioning(size(nums#8, true)#18, 200), ENSURE_REQUIREMENTS, [id=#10]
   +- ObjectHashAggregate(keys=[size(nums#8, true) AS size(nums#8, true)#18], functions=[partial_collect_list(id#7, 0, 0)])
      +- LocalTableScan [id#7, nums#8]
scala> println(q.queryExecution.sparkPlan.numberedTreeString)
00 ObjectHashAggregate(keys=[size(nums#8, true)#18], functions=[collect_list(id#7, 0, 0)], output=[group#11, ids#15])
01 +- ObjectHashAggregate(keys=[size(nums#8, true) AS size(nums#8, true)#18], functions=[partial_collect_list(id#7, 0, 0)], output=[size(nums#8, true)#18, buf#20])
02    +- LocalTableScan [id#7, nums#8]

Going low level. Watch your steps :)

// copied from HashAggregateExec as it is the preferred aggreate physical operator
// and HashAggregateExec is checked first
// When the check fails, ObjectHashAggregateExec is then checked
import q.queryExecution.optimizedPlan
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
val aggLog = optimizedPlan.asInstanceOf[Aggregate]
import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
// groupingExpressions, aggregateExpressions, resultExpressions, child
val (_, aggregateExpressions: Seq[AggregateExpression], _, _) = PhysicalAggregation.unapply(aggLog).get
val aggregateBufferAttributes =
  aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)

One of the reasons why ObjectHashAggregateExec was selected is that HashAggregateExec did not meet the requirements.

import org.apache.spark.sql.execution.aggregate.HashAggregateExec
assert(HashAggregateExec.supportsAggregate(aggregateBufferAttributes) == false)
// collect_list aggregate function uses CollectList TypedImperativeAggregate under the covers
import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
assert(ObjectHashAggregateExec.supportsAggregate(aggregateExpressions))
val aggExec = q.queryExecution.sparkPlan.children.head.asInstanceOf[ObjectHashAggregateExec]
scala> println(aggExec.aggregateExpressions.head.numberedTreeString)
00 partial_collect_list(id#7, 0, 0)
01 +- collect_list(id#7, 0, 0)
02    +- id#7: int