Skip to content

HashAggregateExec Physical Operator

HashAggregateExec is an AggregateCodegenSupport physical operator for Hash-Based Aggregation (with UnsafeRow keys and values) that uses TungstenAggregationIterator (to iterate over UnsafeRows in partitions) when executed.

HashAggregateExec in web UI (Details for Query)

HashAggregateExec is the preferred aggregate physical operator for Aggregate logical operator.

Falling Back To Sort-Based Aggregation

HashAggregateExec uses TungstenAggregationIterator that can (theoretically) switch to a sort-based aggregation when the hash-based processing mode is unable to acquire enough memory.

See testFallbackStartsAt internal property and spark.sql.TungstenAggregate.testFallbackStartsAt configuration property.

Search logs for the following INFO message to know whether the switch has happened.

falling back to sort based aggregation.

Creating Instance

HashAggregateExec takes the following to be created:

HashAggregateExec is created (indirectly through AggUtils.createAggregate) when:

  • Aggregation execution planning strategy is executed (to select the aggregate physical operator for an Aggregate logical operator)
  • StatefulAggregationStrategy (Spark Structured Streaming) execution planning strategy creates plan for streaming EventTimeWatermark or Aggregate logical operators

Grouping Keys

BaseAggregateExec
groupingExpressions: Seq[NamedExpression]

groupingExpressions is part of the BaseAggregateExec abstraction.

HashAggregateExec is given grouping keys (NamedExpressions) when created. There can be no grouping keys.

The grouping keys are the groupingExpressions of Aggregate, if any.

Performance Metrics

avg hash probes per key

Average hash map probe per lookup (i.e. numProbes / numKeyLookups)

numProbes and numKeyLookups are used in BytesToBytesMap (Spark Core) append-only hash map for the number of iteration to look up a single key and the number of all the lookups in total, respectively.

number of output rows

Average hash map probe per lookup (i.e. numProbes / numKeyLookups)

Number of groups (per partition) that (depending on the number of partitions and the side of ShuffleExchangeExec.md[ShuffleExchangeExec] operator) is the number of groups

  • 0 for no input with a grouping expression, e.g. spark.range(0).groupBy($"id").count.show

  • 1 for no grouping expression and no input, e.g. spark.range(0).groupBy().count.show

Tip

Use different number of elements and partitions in range operator to observe the difference in numOutputRows metric, e.g.

spark.
  range(0, 10, 1, numPartitions = 1).
  groupBy($"id" % 5 as "gid").
  count.
  show
spark.
  range(0, 10, 1, numPartitions = 5).
  groupBy($"id" % 5 as "gid").
  count.
  show

number of sort fallback tasks

This metric is managed entirely by the TungstenAggregationIterator.

Indicates how many tasks (that used TungstenAggregationIterator to process input rows for aggregates) switched (fell back) to sort-based aggregation

Number of Input Rows in Partition Matters

Not every task will use TungstenAggregationIterator and may even have a chance to fall back to sort-based aggregation.

peak memory

This metric is managed entirely by the TungstenAggregationIterator.

spill size

Used to create a TungstenAggregationIterator when doExecute

time in aggregation build

Query Planning

HashAggregateExec is the preferred aggregate physical operator in Aggregation execution planning strategy (over ObjectHashAggregateExec and SortAggregateExec operators).

HashAggregateExec is selected as the aggregate physical operator for Aggregate logical operator when all the AggregateFunctions (of the AggregateExpressions) use aggBufferAttributes with mutable data types:

  • BooleanType
  • ByteType
  • CalendarIntervalType
  • DateType
  • DayTimeIntervalType
  • DecimalType
  • DoubleType
  • FloatType
  • IntegerType
  • LongType
  • NullType
  • ShortType
  • TimestampNTZType
  • TimestampType
  • UserDefinedType with a mutable data type
  • YearMonthIntervalType

Required Child Distribution

SparkPlan
requiredChildDistribution: List[Distribution]

requiredChildDistribution is part of the SparkPlan abstraction.

requiredChildDistribution varies per the input required child distribution expressions:

Note

requiredChildDistributionExpressions is exactly requiredChildDistributionExpressions from AggUtils.createAggregate and is undefined by default.


(No distinct in aggregation) requiredChildDistributionExpressions is undefined when HashAggregateExec is created for partial aggregations (i.e. mode is Partial for aggregate expressions).

requiredChildDistributionExpressions is defined, but could possibly be empty, when HashAggregateExec is created for final aggregations (i.e. mode is Final for aggregate expressions).


(one distinct in aggregation) requiredChildDistributionExpressions is undefined when HashAggregateExec is created for partial aggregations (i.e. mode is Partial for aggregate expressions) with one distinct in aggregation.

requiredChildDistributionExpressions is defined, but could possibly be empty, when HashAggregateExec is created for partial merge aggregations (i.e. mode is PartialMerge for aggregate expressions).

FIXME for the following two cases in aggregation with one distinct.

Executing Physical Operator

SparkPlan
doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.

doExecute requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]).

doExecute transforms the RDD[InternalRow] with the transformation f function for every partition (using RDD.mapPartitionsWithIndex transformation).

Transformation Function

doExecute uses RDD.mapPartitionsWithIndex to process partition InternalRows (with a partition ID).

f: (Int, Iterator[T]) => Iterator[U]

For every partition, mapPartitionsWithIndex records the start execution time (beforeAgg).

mapPartitionsWithIndex branches off based on whether there are input rows and grouping keys.

For a grouped aggregate (grouping keys defined) with no input rows (an empty partition), doExecute returns an empty iterator.

Otherwise, doExecute creates a TungstenAggregationIterator.

With no grouping keys defined and no input rows (an empty partition), doExecute increments the numOutputRows metric and returns a single-element Iterator[UnsafeRow] from the TungstenAggregationIterator.

With input rows available (and regardless of grouping keys), doExecute returns the TungstenAggregationIterator.

In the end, doExecute calculates the aggTime metric and returns an Iterator[UnsafeRow] that can be as follows:

RDD.mapPartitionsWithIndex

RDD.mapPartitionsWithIndex

mapPartitionsWithIndex[U: ClassTag](
  f: (Int, Iterator[T]) => Iterator[U],
  preservesPartitioning: Boolean = false): RDD[U]

mapPartitionsWithIndex transformation adds a new MapPartitionsRDD to the RDD lineage.

FIXME Disable AQE

val ids = spark.range(1)
println(ids.queryExecution.toRdd.toDebugString)
(12) SQLExecutionRDD[3] at toRdd at <console>:1 []
 |   MapPartitionsRDD[2] at toRdd at <console>:1 []
 |   MapPartitionsRDD[1] at toRdd at <console>:1 []
 |   ParallelCollectionRDD[0] at toRdd at <console>:1 []
// Use groupBy that gives HashAggregateExec operator
val q = ids.groupBy('id).count
q.explain
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[id#2L], functions=[count(1)])
   +- HashAggregate(keys=[id#2L], functions=[partial_count(1)])
      +- Range (0, 1, step=1, splits=12)
val rdd = q.queryExecution.toRdd
println(rdd.toDebugString)
(12) SQLExecutionRDD[11] at toRdd at <console>:1 []
 |   MapPartitionsRDD[10] at toRdd at <console>:1 []
 |   MapPartitionsRDD[9] at toRdd at <console>:1 []
 |   ParallelCollectionRDD[8] at toRdd at <console>:1 []

Whole-Stage Code Generation

HashAggregateExec supports Whole-Stage Code Generation (as an AggregateCodegenSupport physical operator) only when supportCodegen flag is enabled.

HashAggregateExec is given hashAgg as the variable prefix.

doConsumeWithKeys

AggregateCodegenSupport
doConsumeWithKeys(
  ctx: CodegenContext,
  input: Seq[ExprCode]): String

doConsumeWithKeys is part of the AggregateCodegenSupport abstraction.

doConsumeWithKeys...FIXME

doProduceWithKeys

AggregateCodegenSupport
doProduceWithKeys(
  ctx: CodegenContext): String

doProduceWithKeys is part of the AggregateCodegenSupport abstraction.

doProduceWithKeys uses the following configuration properties:

doProduceWithKeys...FIXME

In the end, doProduceWithKeys generates the following Java code (with the []-marked sections filled out):

if (![initAgg]) {
  [initAgg] = true;
  [createFastHashMap]
  [addHookToCloseFastHashMap]
  [hashMapTerm] = [thisPlan].createHashMap();
  long [beforeAgg] = System.nanoTime();
  [doAggFuncName]();
  [aggTime].add((System.nanoTime() - [beforeAgg]) / [NANOS_PER_MILLIS]);
}
// output the result
[outputFromFastHashMap]
[outputFromRegularHashMap]

Creating HashMap

createHashMap(): UnsafeFixedWidthAggregationMap

Note

createHashMap is used in the Java code from doProduceWithKeys.

createHashMap requests all the DeclarativeAggregate functions for the Catalyst expressions to initialize aggregation buffers.

createHashMap creates an UnsafeProjection for the expressions and executes it (with an "empty" null row).

In the end, createHashMap creates an UnsafeFixedWidthAggregationMap with the following:

UnsafeFixedWidthAggregationMap Value
emptyAggregationBuffer The UnsafeRow after executing the UnsafeProjection to initialize aggregation buffers
aggregationBufferSchema bufferSchema
groupingKeySchema groupingKeySchema

finishAggregate

finishAggregate(
  hashMap: UnsafeFixedWidthAggregationMap,
  sorter: UnsafeKVExternalSorter,
  peakMemory: SQLMetric,
  spillSize: SQLMetric,
  avgHashProbe: SQLMetric,
  numTasksFallBacked: SQLMetric): KVIterator[UnsafeRow, UnsafeRow]

finishAggregate...FIXME

DeclarativeAggregate Functions

declFunctions: Seq[DeclarativeAggregate]

declFunctions is the DeclarativeAggregate expressions among the AggregateFunctions of this AggregateExpressions.

Demo

Aggregation Query

val data = spark.range(10)
val q = data
  .groupBy('id % 2 as "group")
  .agg(sum("id") as "sum")

HashAggregateExec operator should be selected due to:

  1. sum uses mutable types for aggregate expression
  2. just a single id column reference of LongType data type
scala> println(q.queryExecution.executedPlan.numberedTreeString)
00 AdaptiveSparkPlan isFinalPlan=false
01 +- HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
02    +- Exchange hashpartitioning(_groupingexpression#8L, 200), ENSURE_REQUIREMENTS, [plan_id=15]
03       +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
04          +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
05             +- Range (0, 10, step=1, splits=16)

Generate Final Plan

isFinalPlan flag is false. Let's execute it and access the final plan.

import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
val op = q
  .queryExecution
  .executedPlan
  .collect { case op: AdaptiveSparkPlanExec => op }
  .head

Execute the adaptive operator to generate the final execution plan.

op.executeTake(1)

isFinalPlan flag should now be true.

scala> println(op.treeString)
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 0
         +- Exchange hashpartitioning(_groupingexpression#8L, 200), ENSURE_REQUIREMENTS, [plan_id=25]
            +- *(1) HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
               +- *(1) Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
                  +- *(1) Range (0, 10, step=1, splits=16)
+- == Initial Plan ==
   HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
   +- Exchange hashpartitioning(_groupingexpression#8L, 200), ENSURE_REQUIREMENTS, [plan_id=15]
      +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
         +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
            +- Range (0, 10, step=1, splits=16)

With the isFinalPlan flag true, it is possible to print out the WholeStageCodegen subtrees.

scala> q.queryExecution.debug.codegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:284; maxConstantPoolSize:337(0.51% used); numInnerClasses:2) ==
*(1) HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
+- *(1) Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
   +- *(1) Range (0, 10, step=1, splits=16)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean hashAgg_initAgg_0;
/* 010 */   private org.apache.spark.unsafe.KVIterator hashAgg_mapIter_0;
/* 011 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap hashAgg_hashMap_0;
/* 012 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter hashAgg_sorter_0;
/* 013 */   private scala.collection.Iterator inputadapter_input_0;
/* 014 */   private boolean hashAgg_hashAgg_isNull_2_0;
...

Let's access the generated source code via WholeStageCodegenExec physical operator.

val aqe = op
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = aqe.executedPlan
  .collect { case op: WholeStageCodegenExec => op }
  .head
val (_, source) = wsce.doCodeGen
import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
val formattedCode = CodeFormatter.format(source)
scala> println(formattedCode)
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean hashAgg_initAgg_0;
/* 010 */   private org.apache.spark.unsafe.KVIterator hashAgg_mapIter_0;
/* 011 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap hashAgg_hashMap_0;
/* 012 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter hashAgg_sorter_0;
/* 013 */   private scala.collection.Iterator inputadapter_input_0;
/* 014 */   private boolean hashAgg_hashAgg_isNull_2_0;
...
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
01 +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
02    +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
03       +- Range (0, 10, step=1, splits=16)

Going low level. Mind your steps! 😊

import org.apache.spark.sql.catalyst.plans.logical.Aggregate
val aggLog = q.queryExecution.optimizedPlan.asInstanceOf[Aggregate]

import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
val (_, aggregateExpressions: Seq[AggregateExpression], _, _) = PhysicalAggregation.unapply(aggLog).get
val aggregateBufferAttributes =
  aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)

Here comes the very reason why HashAggregateExec was selected. Aggregation execution planning strategy prefers HashAggregateExec when aggregateBufferAttributes are supported.

import org.apache.spark.sql.catalyst.plans.logical.Aggregate
assert(Aggregate.supportsHashAggregate(aggregateBufferAttributes))
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
01 +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
02    +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
03       +- Range (0, 10, step=1, splits=16)

Execute HashAggregateExec

val hashAggExecRDD = hashAggExec.execute
println(hashAggExecRDD.toDebugString)
(16) MapPartitionsRDD[4] at execute at <console>:1 []
 |   MapPartitionsRDD[3] at execute at <console>:1 []
 |   MapPartitionsRDD[2] at execute at <console>:1 []
 |   MapPartitionsRDD[1] at execute at <console>:1 []
 |   ParallelCollectionRDD[0] at execute at <console>:1 []

Java Code for Produce Execution Path

import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext
val parent = hashAggExec
val doProduceWithKeysCode = hashAggExec.produce(ctx, parent)
scala> println(doProduceWithKeysCode)
if (!hashAgg_initAgg_0) {
  hashAgg_initAgg_0 = true;


  hashAgg_hashMap_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
  long hashAgg_beforeAgg_1 = System.nanoTime();
  hashAgg_doAggregateWithKeys_0();
  ((org.apache.spark.sql.execution.metric.SQLMetric) references[16] /* aggTime */).add((System.nanoTime() - hashAgg_beforeAgg_1) / 1000000);
}
// output the result


while ( hashAgg_mapIter_0.next()) {
  UnsafeRow hashAgg_aggKey_1 = (UnsafeRow) hashAgg_mapIter_0.getKey();
  UnsafeRow hashAgg_aggBuffer_1 = (UnsafeRow) hashAgg_mapIter_0.getValue();
  hashAgg_doAggregateWithKeysOutput_1(hashAgg_aggKey_1, hashAgg_aggBuffer_1);
  if (shouldStop()) return;
}
hashAgg_mapIter_0.close();
if (hashAgg_sorter_0 == null) {
  hashAgg_hashMap_0.free();
}