Skip to content

HashAggregateExec Physical Operator

HashAggregateExec is a unary physical operator for hash-based aggregation.

HashAggregateExec in web UI (Details for Query)

HashAggregateExec is a BlockingOperatorWithCodegen.

HashAggregateExec is an AliasAwareOutputPartitioning.

Note

HashAggregateExec is the preferred aggregate physical operator for Aggregation execution planning strategy (over ObjectHashAggregateExec and SortAggregateExec).

HashAggregateExec supports Java code generation (aka codegen).

HashAggregateExec uses TungstenAggregationIterator (to iterate over UnsafeRows in partitions) when executed.

Note

HashAggregateExec uses TungstenAggregationIterator that can (theoretically) switch to a sort-based aggregation when the hash-based approach is unable to acquire enough memory.

See testFallbackStartsAt internal property and spark.sql.TungstenAggregate.testFallbackStartsAt configuration property.

Search logs for the following INFO message to know whether the switch has happened.

falling back to sort based aggregation.

Creating Instance

HashAggregateExec takes the following to be created:

HashAggregateExec is created when (indirectly through AggUtils.createAggregate) when:

  • Aggregation execution planning strategy is executed (to select the aggregate physical operator for an Aggregate logical operator)

  • StatefulAggregationStrategy (Spark Structured Streaming) execution planning strategy creates plan for streaming EventTimeWatermark or Aggregate logical operators

Performance Metrics

avg hash probe bucket list iters

Average hash map probe per lookup (i.e. numProbes / numKeyLookups)

numProbes and numKeyLookups are used in BytesToBytesMap (Spark Core) append-only hash map for the number of iteration to look up a single key and the number of all the lookups in total, respectively.

number of output rows

Average hash map probe per lookup (i.e. numProbes / numKeyLookups)

Number of groups (per partition) that (depending on the number of partitions and the side of ShuffleExchangeExec.md[ShuffleExchangeExec] operator) is the number of groups

  • 0 for no input with a grouping expression, e.g. spark.range(0).groupBy($"id").count.show

  • 1 for no grouping expression and no input, e.g. spark.range(0).groupBy().count.show

Tip

Use different number of elements and partitions in range operator to observe the difference in numOutputRows metric, e.g.

spark.
  range(0, 10, 1, numPartitions = 1).
  groupBy($"id" % 5 as "gid").
  count.
  show

spark.
  range(0, 10, 1, numPartitions = 5).
  groupBy($"id" % 5 as "gid").
  count.
  show

number of sort fallback tasks

peak memory

spill size

Used to create a TungstenAggregationIterator when doExecute

time in aggregation build

Required Child Distribution

requiredChildDistribution: List[Distribution]

requiredChildDistribution is part of the SparkPlan abstraction.

requiredChildDistribution varies per the input required child distribution expressions:

Note

requiredChildDistributionExpressions is exactly requiredChildDistributionExpressions from AggUtils.createAggregate and is undefined by default.


(No distinct in aggregation) requiredChildDistributionExpressions is undefined when HashAggregateExec is created for partial aggregations (i.e. mode is Partial for aggregate expressions).

requiredChildDistributionExpressions is defined, but could possibly be empty, when HashAggregateExec is created for final aggregations (i.e. mode is Final for aggregate expressions).


(one distinct in aggregation) requiredChildDistributionExpressions is undefined when HashAggregateExec is created for partial aggregations (i.e. mode is Partial for aggregate expressions) with one distinct in aggregation.

requiredChildDistributionExpressions is defined, but could possibly be empty, when HashAggregateExec is created for partial merge aggregations (i.e. mode is PartialMerge for aggregate expressions).

FIXME for the following two cases in aggregation with one distinct.

NOTE: The prefix for variable names for HashAggregateExec operators in CodegenSupport-generated code is agg.

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.


doExecute requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]).

doExecute transforms the RDD[InternalRow] with a transformation function for every partition.

Processing Partition Rows

doExecute uses RDD.mapPartitionsWithIndex to process InternalRows per partition (with a partition ID).

For every partition, mapPartitionsWithIndex records the start execution time (beforeAgg).

mapPartitionsWithIndex branches off based on whether there are input rows and grouping keys.

For a grouped aggregate (grouping keys defined) with no input rows (an empty partition), doExecute returns an empty iterator.

Otherwise, doExecute creates a TungstenAggregationIterator.

With no grouping keys defined and no input rows (an empty partition), doExecute increments the numOutputRows metric and returns a single-element Iterator[UnsafeRow] from the TungstenAggregationIterator.

With input rows available (and regardless of grouping keys), doExecute returns the TungstenAggregationIterator.

In the end, doExecute calculates the aggTime metric and returns an Iterator[UnsafeRow] that can be as follows:

mapPartitionsWithIndex

mapPartitionsWithIndex[U: ClassTag](
  f: (Int, Iterator[T]) => Iterator[U],
  preservesPartitioning: Boolean = false): RDD[U]

RDD.mapPartitionsWithIndex adds a new MapPartitionsRDD to the RDD lineage.

val ids = spark.range(1)
scala> println(ids.queryExecution.toRdd.toDebugString)
(8) MapPartitionsRDD[12] at toRdd at <console>:29 []
|  MapPartitionsRDD[11] at toRdd at <console>:29 []
|  ParallelCollectionRDD[10] at toRdd at <console>:29 []

// Use groupBy that gives HashAggregateExec operator
val q = ids.groupBy('id).count
scala> q.explain
== Physical Plan ==
*(2) HashAggregate(keys=[id#30L], functions=[count(1)])
+- Exchange hashpartitioning(id#30L, 200)
  +- *(1) HashAggregate(keys=[id#30L], functions=[partial_count(1)])
      +- *(1) Range (0, 1, step=1, splits=8)

val rdd = q.queryExecution.toRdd
scala> println(rdd.toDebugString)
(200) MapPartitionsRDD[18] at toRdd at <console>:28 []
  |   ShuffledRowRDD[17] at toRdd at <console>:28 []
  +-(8) MapPartitionsRDD[16] at toRdd at <console>:28 []
    |  MapPartitionsRDD[15] at toRdd at <console>:28 []
    |  MapPartitionsRDD[14] at toRdd at <console>:28 []
    |  ParallelCollectionRDD[13] at toRdd at <console>:28 []

Generating Java Code for Consume Path

doConsume(
  ctx: CodegenContext,
  input: Seq[ExprCode],
  row: ExprCode): String

doConsume is part of the CodegenSupport abstraction.


doConsume doConsumeWithoutKeys when no named expressions for the grouping keys were specified for the HashAggregateExec or doConsumeWithKeys otherwise.

doConsumeWithKeys

doConsumeWithKeys(
  ctx: CodegenContext,
  input: Seq[ExprCode]): String

doConsumeWithKeys...FIXME

doConsumeWithoutKeys

doConsumeWithoutKeys(
  ctx: CodegenContext,
  input: Seq[ExprCode]): String

doConsumeWithoutKeys...FIXME

Generating Java Code for Produce Path

doProduce(
  ctx: CodegenContext): String

doProduce is part of the CodegenSupport abstraction.


doProduce executes doProduceWithoutKeys when no named expressions for the grouping keys were specified for the HashAggregateExec or doProduceWithKeys otherwise.

doProduceWithoutKeys

doProduceWithoutKeys(
  ctx: CodegenContext): String

doProduceWithoutKeys...FIXME

generateResultFunction

generateResultFunction(
  ctx: CodegenContext): String

generateResultFunction...FIXME

finishAggregate

finishAggregate(
  hashMap: UnsafeFixedWidthAggregationMap,
  sorter: UnsafeKVExternalSorter,
  peakMemory: SQLMetric,
  spillSize: SQLMetric,
  avgHashProbe: SQLMetric): KVIterator[UnsafeRow, UnsafeRow]

finishAggregate...FIXME

doProduceWithKeys

doProduceWithKeys(
  ctx: CodegenContext): String

doProduceWithKeys is part of the AggregateCodegenSupport abstraction.


doProduceWithKeys uses the following configuration properties:

doProduceWithKeys...FIXME

In the end, doProduceWithKeys generates the following Java code (with the []-marked sections filled out):

if (![initAgg]) {
  [initAgg] = true;
  [createFastHashMap]
  [addHookToCloseFastHashMap]
  [hashMapTerm] = [thisPlan].createHashMap();
  long [beforeAgg] = System.nanoTime();
  [doAggFuncName]();
  [aggTime].add((System.nanoTime() - [beforeAgg]) / [NANOS_PER_MILLIS]);
}
// output the result
[outputFromFastHashMap]
[outputFromRegularHashMap]

Creating HashMap

createHashMap(): UnsafeFixedWidthAggregationMap

createHashMap requests all the DeclarativeAggregate functions for the Catalyst expressions to initialize aggregation buffers.

createHashMap creates an UnsafeProjection for the expressions and executes it (with an "empty" null row).

In the end, createHashMap creates an UnsafeFixedWidthAggregationMap with the following:

UnsafeFixedWidthAggregationMap Value
emptyAggregationBuffer The UnsafeRow after executing the UnsafeProjection to initialize aggregation buffers
aggregationBufferSchema bufferSchema
groupingKeySchema groupingKeySchema

Demo

Aggregation Query

val data = spark.range(10)
val q = data
  .groupBy('id % 2 as "group")
  .agg(sum("id") as "sum")

HashAggregateExec operator should be selected due to:

  1. sum uses mutable types for aggregate expression
  2. just a single id column reference of LongType data type
scala> println(q.queryExecution.executedPlan.numberedTreeString)
00 AdaptiveSparkPlan isFinalPlan=false
01 +- HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
02    +- Exchange hashpartitioning(_groupingexpression#8L, 200), ENSURE_REQUIREMENTS, [plan_id=15]
03       +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
04          +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
05             +- Range (0, 10, step=1, splits=16)

Generate Final Plan

isFinalPlan flag is false. Let's execute it and access the final plan.

import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
val op = q
  .queryExecution
  .executedPlan
  .collect { case op: AdaptiveSparkPlanExec => op }
  .head

Execute the adaptive operator to generate the final execution plan.

op.executeTake(1)

isFinalPlan flag should now be true.

scala> println(op.treeString)
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 0
         +- Exchange hashpartitioning(_groupingexpression#8L, 200), ENSURE_REQUIREMENTS, [plan_id=25]
            +- *(1) HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
               +- *(1) Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
                  +- *(1) Range (0, 10, step=1, splits=16)
+- == Initial Plan ==
   HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
   +- Exchange hashpartitioning(_groupingexpression#8L, 200), ENSURE_REQUIREMENTS, [plan_id=15]
      +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
         +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
            +- Range (0, 10, step=1, splits=16)

With the isFinalPlan flag true, it is possible to print out the WholeStageCodegen subtrees.

scala> q.queryExecution.debug.codegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:284; maxConstantPoolSize:337(0.51% used); numInnerClasses:2) ==
*(1) HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
+- *(1) Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
   +- *(1) Range (0, 10, step=1, splits=16)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean hashAgg_initAgg_0;
/* 010 */   private org.apache.spark.unsafe.KVIterator hashAgg_mapIter_0;
/* 011 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap hashAgg_hashMap_0;
/* 012 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter hashAgg_sorter_0;
/* 013 */   private scala.collection.Iterator inputadapter_input_0;
/* 014 */   private boolean hashAgg_hashAgg_isNull_2_0;
...

Let's access the generated source code via WholeStageCodegenExec physical operator.

val aqe = op
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = aqe.executedPlan
  .collect { case op: WholeStageCodegenExec => op }
  .head
val (_, source) = wsce.doCodeGen
import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
val formattedCode = CodeFormatter.format(source)
scala> println(formattedCode)
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean hashAgg_initAgg_0;
/* 010 */   private org.apache.spark.unsafe.KVIterator hashAgg_mapIter_0;
/* 011 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap hashAgg_hashMap_0;
/* 012 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter hashAgg_sorter_0;
/* 013 */   private scala.collection.Iterator inputadapter_input_0;
/* 014 */   private boolean hashAgg_hashAgg_isNull_2_0;
...
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
01 +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
02    +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
03       +- Range (0, 10, step=1, splits=16)

Going low level. Watch your steps :)

import org.apache.spark.sql.catalyst.plans.logical.Aggregate
val aggLog = q.queryExecution.optimizedPlan.asInstanceOf[Aggregate]

import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
val (_, aggregateExpressions: Seq[AggregateExpression], _, _) = PhysicalAggregation.unapply(aggLog).get
val aggregateBufferAttributes =
  aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)

Here comes the very reason why HashAggregateExec was selected. Aggregation execution planning strategy prefers HashAggregateExec when aggregateBufferAttributes are supported.

import org.apache.spark.sql.catalyst.plans.logical.Aggregate
assert(Aggregate.supportsHashAggregate(aggregateBufferAttributes))
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
01 +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
02    +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
03       +- Range (0, 10, step=1, splits=16)

Execute HashAggregateExec

val hashAggExecRDD = hashAggExec.execute
println(hashAggExecRDD.toDebugString)
(16) MapPartitionsRDD[4] at execute at <console>:1 []
 |   MapPartitionsRDD[3] at execute at <console>:1 []
 |   MapPartitionsRDD[2] at execute at <console>:1 []
 |   MapPartitionsRDD[1] at execute at <console>:1 []
 |   ParallelCollectionRDD[0] at execute at <console>:1 []

Java Code for Produce Execution Path

import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext
val parent = hashAggExec
val doProduceWithKeysCode = hashAggExec.produce(ctx, parent)
scala> println(doProduceWithKeysCode)
if (!hashAgg_initAgg_0) {
  hashAgg_initAgg_0 = true;


  hashAgg_hashMap_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
  long hashAgg_beforeAgg_1 = System.nanoTime();
  hashAgg_doAggregateWithKeys_0();
  ((org.apache.spark.sql.execution.metric.SQLMetric) references[16] /* aggTime */).add((System.nanoTime() - hashAgg_beforeAgg_1) / 1000000);
}
// output the result


while ( hashAgg_mapIter_0.next()) {
  UnsafeRow hashAgg_aggKey_1 = (UnsafeRow) hashAgg_mapIter_0.getKey();
  UnsafeRow hashAgg_aggBuffer_1 = (UnsafeRow) hashAgg_mapIter_0.getValue();
  hashAgg_doAggregateWithKeysOutput_1(hashAgg_aggKey_1, hashAgg_aggBuffer_1);
  if (shouldStop()) return;
}
hashAgg_mapIter_0.close();
if (hashAgg_sorter_0 == null) {
  hashAgg_hashMap_0.free();
}