HashAggregateExec Physical Operator¶
HashAggregateExec
is a unary physical operator for hash-based aggregation.
HashAggregateExec
is a BlockingOperatorWithCodegen
.
HashAggregateExec
is an AliasAwareOutputPartitioning.
Note
HashAggregateExec
is the preferred aggregate physical operator for Aggregation execution planning strategy (over ObjectHashAggregateExec and SortAggregateExec).
HashAggregateExec
supports Java code generation (aka codegen).
HashAggregateExec
uses TungstenAggregationIterator (to iterate over UnsafeRows
in partitions) when executed.
Note
HashAggregateExec
uses TungstenAggregationIterator
that can (theoretically) switch to a sort-based aggregation when the hash-based approach is unable to acquire enough memory.
See testFallbackStartsAt internal property and spark.sql.TungstenAggregate.testFallbackStartsAt configuration property.
Search logs for the following INFO message to know whether the switch has happened.
falling back to sort based aggregation.
Creating Instance¶
HashAggregateExec
takes the following to be created:
- Required child distribution expressions
- Grouping Keys (as NamedExpressions)
- AggregateExpressions
- Aggregate attributes
- Initial input buffer offset
- Output named expressions
- Child physical operator
HashAggregateExec
is created when (indirectly through AggUtils.createAggregate) when:
-
Aggregation execution planning strategy is executed (to select the aggregate physical operator for an Aggregate logical operator)
-
StatefulAggregationStrategy
(Spark Structured Streaming) execution planning strategy creates plan for streamingEventTimeWatermark
or Aggregate logical operators
Performance Metrics¶
avg hash probe bucket list iters¶
Average hash map probe per lookup (i.e. numProbes
/ numKeyLookups
)
numProbes
and numKeyLookups
are used in BytesToBytesMap
(Spark Core) append-only hash map for the number of iteration to look up a single key and the number of all the lookups in total, respectively.
number of output rows¶
Average hash map probe per lookup (i.e. numProbes
/ numKeyLookups
)
Number of groups (per partition) that (depending on the number of partitions and the side of ShuffleExchangeExec.md[ShuffleExchangeExec] operator) is the number of groups
-
0
for no input with a grouping expression, e.g.spark.range(0).groupBy($"id").count.show
-
1
for no grouping expression and no input, e.g.spark.range(0).groupBy().count.show
Tip
Use different number of elements and partitions in range
operator to observe the difference in numOutputRows
metric, e.g.
spark.
range(0, 10, 1, numPartitions = 1).
groupBy($"id" % 5 as "gid").
count.
show
spark.
range(0, 10, 1, numPartitions = 5).
groupBy($"id" % 5 as "gid").
count.
show
number of sort fallback tasks¶
peak memory¶
spill size¶
Used to create a TungstenAggregationIterator when doExecute
time in aggregation build¶
Required Child Distribution¶
requiredChildDistribution: List[Distribution]
requiredChildDistribution
is part of the SparkPlan abstraction.
requiredChildDistribution
varies per the input required child distribution expressions:
- AllTuples when defined, but empty
- ClusteredDistribution for non-empty expressions
- UnspecifiedDistribution when undefined
Note
requiredChildDistributionExpressions
is exactly requiredChildDistributionExpressions
from AggUtils.createAggregate and is undefined by default.
(No distinct in aggregation) requiredChildDistributionExpressions
is undefined when HashAggregateExec
is created for partial aggregations (i.e. mode
is Partial
for aggregate expressions).
requiredChildDistributionExpressions
is defined, but could possibly be empty, when HashAggregateExec
is created for final aggregations (i.e. mode
is Final
for aggregate expressions).
(one distinct in aggregation) requiredChildDistributionExpressions
is undefined when HashAggregateExec
is created for partial aggregations (i.e. mode
is Partial
for aggregate expressions) with one distinct in aggregation.
requiredChildDistributionExpressions
is defined, but could possibly be empty, when HashAggregateExec
is created for partial merge aggregations (i.e. mode
is PartialMerge
for aggregate expressions).
FIXME for the following two cases in aggregation with one distinct.
NOTE: The prefix for variable names for HashAggregateExec
operators in CodegenSupport-generated code is agg.
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan abstraction.
doExecute
requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]
).
doExecute
transforms the RDD[InternalRow]
with a transformation function for every partition.
Processing Partition Rows¶
doExecute
uses RDD.mapPartitionsWithIndex to process InternalRows per partition (with a partition ID).
For every partition, mapPartitionsWithIndex
records the start execution time (beforeAgg
).
mapPartitionsWithIndex
branches off based on whether there are input rows and grouping keys.
For a grouped aggregate (grouping keys defined) with no input rows (an empty partition), doExecute
returns an empty iterator.
Otherwise, doExecute
creates a TungstenAggregationIterator.
With no grouping keys defined and no input rows (an empty partition), doExecute
increments the numOutputRows metric and returns a single-element Iterator[UnsafeRow]
from the TungstenAggregationIterator.
With input rows available (and regardless of grouping keys), doExecute
returns the TungstenAggregationIterator
.
In the end, doExecute
calculates the aggTime metric and returns an Iterator[UnsafeRow]
that can be as follows:
- Empty
- A single-element one
- The TungstenAggregationIterator
mapPartitionsWithIndex¶
mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
RDD.mapPartitionsWithIndex
adds a new MapPartitionsRDD
to the RDD lineage.
val ids = spark.range(1)
scala> println(ids.queryExecution.toRdd.toDebugString)
(8) MapPartitionsRDD[12] at toRdd at <console>:29 []
| MapPartitionsRDD[11] at toRdd at <console>:29 []
| ParallelCollectionRDD[10] at toRdd at <console>:29 []
// Use groupBy that gives HashAggregateExec operator
val q = ids.groupBy('id).count
scala> q.explain
== Physical Plan ==
*(2) HashAggregate(keys=[id#30L], functions=[count(1)])
+- Exchange hashpartitioning(id#30L, 200)
+- *(1) HashAggregate(keys=[id#30L], functions=[partial_count(1)])
+- *(1) Range (0, 1, step=1, splits=8)
val rdd = q.queryExecution.toRdd
scala> println(rdd.toDebugString)
(200) MapPartitionsRDD[18] at toRdd at <console>:28 []
| ShuffledRowRDD[17] at toRdd at <console>:28 []
+-(8) MapPartitionsRDD[16] at toRdd at <console>:28 []
| MapPartitionsRDD[15] at toRdd at <console>:28 []
| MapPartitionsRDD[14] at toRdd at <console>:28 []
| ParallelCollectionRDD[13] at toRdd at <console>:28 []
Generating Java Code for Consume Path¶
doConsume(
ctx: CodegenContext,
input: Seq[ExprCode],
row: ExprCode): String
doConsume
is part of the CodegenSupport abstraction.
doConsume
doConsumeWithoutKeys when no named expressions for the grouping keys were specified for the HashAggregateExec
or doConsumeWithKeys otherwise.
doConsumeWithKeys¶
doConsumeWithKeys(
ctx: CodegenContext,
input: Seq[ExprCode]): String
doConsumeWithKeys
...FIXME
doConsumeWithoutKeys¶
doConsumeWithoutKeys(
ctx: CodegenContext,
input: Seq[ExprCode]): String
doConsumeWithoutKeys
...FIXME
Generating Java Code for Produce Path¶
doProduce(
ctx: CodegenContext): String
doProduce
is part of the CodegenSupport abstraction.
doProduce
executes doProduceWithoutKeys when no named expressions for the grouping keys were specified for the HashAggregateExec
or doProduceWithKeys otherwise.
doProduceWithoutKeys¶
doProduceWithoutKeys(
ctx: CodegenContext): String
doProduceWithoutKeys
...FIXME
generateResultFunction¶
generateResultFunction(
ctx: CodegenContext): String
generateResultFunction
...FIXME
finishAggregate¶
finishAggregate(
hashMap: UnsafeFixedWidthAggregationMap,
sorter: UnsafeKVExternalSorter,
peakMemory: SQLMetric,
spillSize: SQLMetric,
avgHashProbe: SQLMetric): KVIterator[UnsafeRow, UnsafeRow]
finishAggregate
...FIXME
doProduceWithKeys¶
doProduceWithKeys(
ctx: CodegenContext): String
doProduceWithKeys
is part of the AggregateCodegenSupport abstraction.
doProduceWithKeys
uses the following configuration properties:
- spark.sql.codegen.aggregate.map.twolevel.enabled
- spark.sql.codegen.aggregate.map.vectorized.enable
- spark.sql.codegen.aggregate.fastHashMap.capacityBit
doProduceWithKeys
...FIXME
In the end, doProduceWithKeys
generates the following Java code (with the []
-marked sections filled out):
if (![initAgg]) {
[initAgg] = true;
[createFastHashMap]
[addHookToCloseFastHashMap]
[hashMapTerm] = [thisPlan].createHashMap();
long [beforeAgg] = System.nanoTime();
[doAggFuncName]();
[aggTime].add((System.nanoTime() - [beforeAgg]) / [NANOS_PER_MILLIS]);
}
// output the result
[outputFromFastHashMap]
[outputFromRegularHashMap]
Creating HashMap¶
createHashMap(): UnsafeFixedWidthAggregationMap
createHashMap
requests all the DeclarativeAggregate functions for the Catalyst expressions to initialize aggregation buffers.
createHashMap
creates an UnsafeProjection for the expressions and executes it (with an "empty" null
row).
Note
Executing an UnsafeProjection produces an UnsafeRow that becomes an empty aggregation buffer of an UnsafeFixedWidthAggregationMap to be created.
In the end, createHashMap
creates an UnsafeFixedWidthAggregationMap with the following:
UnsafeFixedWidthAggregationMap | Value |
---|---|
emptyAggregationBuffer | The UnsafeRow after executing the UnsafeProjection to initialize aggregation buffers |
aggregationBufferSchema | bufferSchema |
groupingKeySchema | groupingKeySchema |
Demo¶
Aggregation Query¶
val data = spark.range(10)
val q = data
.groupBy('id % 2 as "group")
.agg(sum("id") as "sum")
HashAggregateExec
operator should be selected due to:
sum
uses mutable types for aggregate expression- just a single
id
column reference ofLongType
data type
scala> println(q.queryExecution.executedPlan.numberedTreeString)
00 AdaptiveSparkPlan isFinalPlan=false
01 +- HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
02 +- Exchange hashpartitioning(_groupingexpression#8L, 200), ENSURE_REQUIREMENTS, [plan_id=15]
03 +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
04 +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
05 +- Range (0, 10, step=1, splits=16)
Generate Final Plan¶
isFinalPlan
flag is false
. Let's execute it and access the final plan.
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
val op = q
.queryExecution
.executedPlan
.collect { case op: AdaptiveSparkPlanExec => op }
.head
Execute the adaptive operator to generate the final execution plan.
op.executeTake(1)
isFinalPlan
flag should now be true
.
scala> println(op.treeString)
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(2) HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(_groupingexpression#8L, 200), ENSURE_REQUIREMENTS, [plan_id=25]
+- *(1) HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
+- *(1) Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
+- *(1) Range (0, 10, step=1, splits=16)
+- == Initial Plan ==
HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
+- Exchange hashpartitioning(_groupingexpression#8L, 200), ENSURE_REQUIREMENTS, [plan_id=15]
+- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
+- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
+- Range (0, 10, step=1, splits=16)
With the isFinalPlan
flag true
, it is possible to print out the WholeStageCodegen subtrees.
scala> q.queryExecution.debug.codegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:284; maxConstantPoolSize:337(0.51% used); numInnerClasses:2) ==
*(1) HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
+- *(1) Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
+- *(1) Range (0, 10, step=1, splits=16)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean hashAgg_initAgg_0;
/* 010 */ private org.apache.spark.unsafe.KVIterator hashAgg_mapIter_0;
/* 011 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap hashAgg_hashMap_0;
/* 012 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter hashAgg_sorter_0;
/* 013 */ private scala.collection.Iterator inputadapter_input_0;
/* 014 */ private boolean hashAgg_hashAgg_isNull_2_0;
...
Let's access the generated source code via WholeStageCodegenExec physical operator.
val aqe = op
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = aqe.executedPlan
.collect { case op: WholeStageCodegenExec => op }
.head
val (_, source) = wsce.doCodeGen
import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
val formattedCode = CodeFormatter.format(source)
scala> println(formattedCode)
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean hashAgg_initAgg_0;
/* 010 */ private org.apache.spark.unsafe.KVIterator hashAgg_mapIter_0;
/* 011 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap hashAgg_hashMap_0;
/* 012 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter hashAgg_sorter_0;
/* 013 */ private scala.collection.Iterator inputadapter_input_0;
/* 014 */ private boolean hashAgg_hashAgg_isNull_2_0;
...
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
01 +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
02 +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
03 +- Range (0, 10, step=1, splits=16)
Going low level. Watch your steps :)
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
val aggLog = q.queryExecution.optimizedPlan.asInstanceOf[Aggregate]
import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
val (_, aggregateExpressions: Seq[AggregateExpression], _, _) = PhysicalAggregation.unapply(aggLog).get
val aggregateBufferAttributes =
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
Here comes the very reason why HashAggregateExec
was selected. Aggregation execution planning strategy prefers HashAggregateExec
when aggregateBufferAttributes
are supported.
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
assert(Aggregate.supportsHashAggregate(aggregateBufferAttributes))
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
01 +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
02 +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
03 +- Range (0, 10, step=1, splits=16)
Execute HashAggregateExec¶
val hashAggExecRDD = hashAggExec.execute
println(hashAggExecRDD.toDebugString)
(16) MapPartitionsRDD[4] at execute at <console>:1 []
| MapPartitionsRDD[3] at execute at <console>:1 []
| MapPartitionsRDD[2] at execute at <console>:1 []
| MapPartitionsRDD[1] at execute at <console>:1 []
| ParallelCollectionRDD[0] at execute at <console>:1 []
Java Code for Produce Execution Path¶
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext
val parent = hashAggExec
val doProduceWithKeysCode = hashAggExec.produce(ctx, parent)
scala> println(doProduceWithKeysCode)
if (!hashAgg_initAgg_0) {
hashAgg_initAgg_0 = true;
hashAgg_hashMap_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
long hashAgg_beforeAgg_1 = System.nanoTime();
hashAgg_doAggregateWithKeys_0();
((org.apache.spark.sql.execution.metric.SQLMetric) references[16] /* aggTime */).add((System.nanoTime() - hashAgg_beforeAgg_1) / 1000000);
}
// output the result
while ( hashAgg_mapIter_0.next()) {
UnsafeRow hashAgg_aggKey_1 = (UnsafeRow) hashAgg_mapIter_0.getKey();
UnsafeRow hashAgg_aggBuffer_1 = (UnsafeRow) hashAgg_mapIter_0.getValue();
hashAgg_doAggregateWithKeysOutput_1(hashAgg_aggKey_1, hashAgg_aggBuffer_1);
if (shouldStop()) return;
}
hashAgg_mapIter_0.close();
if (hashAgg_sorter_0 == null) {
hashAgg_hashMap_0.free();
}