HashAggregateExec Physical Operator¶
HashAggregateExec
is an AggregateCodegenSupport physical operator for Hash-Based Aggregation (with UnsafeRow keys and values) that uses TungstenAggregationIterator (to iterate over UnsafeRows in partitions) when executed.
HashAggregateExec
is the preferred aggregate physical operator for Aggregate logical operator.
Falling Back To Sort-Based Aggregation
HashAggregateExec
uses TungstenAggregationIterator
that can (theoretically) switch to a sort-based aggregation when the hash-based processing mode is unable to acquire enough memory.
See testFallbackStartsAt internal property and spark.sql.TungstenAggregate.testFallbackStartsAt configuration property.
Search logs for the following INFO message to know whether the switch has happened.
falling back to sort based aggregation.
Creating Instance¶
HashAggregateExec
takes the following to be created:
- Required child distribution expressions
-
isStreaming
flag - Number of Shuffle Partitions (optional)
- Grouping Keys
- Aggregates (AggregateExpressions)
- Aggregate Attributes
- Initial Input Buffer Offset
- Result (NamedExpressions)
- Child Physical Operator
HashAggregateExec
is created (indirectly through AggUtils.createAggregate) when:
- Aggregation execution planning strategy is executed (to select the aggregate physical operator for an Aggregate logical operator)
StatefulAggregationStrategy
(Spark Structured Streaming) execution planning strategy creates plan for streamingEventTimeWatermark
or Aggregate logical operators
Grouping Keys¶
BaseAggregateExec
groupingExpressions: Seq[NamedExpression]
groupingExpressions
is part of the BaseAggregateExec abstraction.
HashAggregateExec
is given grouping keys (NamedExpressions) when created. There can be no grouping keys.
The grouping keys are the groupingExpressions of Aggregate, if any.
Performance Metrics¶
avg hash probes per key¶
Average hash map probe per lookup (i.e. numProbes
/ numKeyLookups
)
numProbes
and numKeyLookups
are used in BytesToBytesMap
(Spark Core) append-only hash map for the number of iteration to look up a single key and the number of all the lookups in total, respectively.
number of output rows¶
Average hash map probe per lookup (i.e. numProbes
/ numKeyLookups
)
Number of groups (per partition) that (depending on the number of partitions and the side of ShuffleExchangeExec.md[ShuffleExchangeExec] operator) is the number of groups
-
0
for no input with a grouping expression, e.g.spark.range(0).groupBy($"id").count.show
-
1
for no grouping expression and no input, e.g.spark.range(0).groupBy().count.show
Tip
Use different number of elements and partitions in range
operator to observe the difference in numOutputRows
metric, e.g.
spark.
range(0, 10, 1, numPartitions = 1).
groupBy($"id" % 5 as "gid").
count.
show
spark.
range(0, 10, 1, numPartitions = 5).
groupBy($"id" % 5 as "gid").
count.
show
number of sort fallback tasks¶
This metric is managed entirely by the TungstenAggregationIterator.
Indicates how many tasks (that used TungstenAggregationIterator to process input rows for aggregates) switched (fell back) to sort-based aggregation
Number of Input Rows in Partition Matters
Not every task will use TungstenAggregationIterator and may even have a chance to fall back to sort-based aggregation.
peak memory¶
This metric is managed entirely by the TungstenAggregationIterator.
spill size¶
Used to create a TungstenAggregationIterator when doExecute
time in aggregation build¶
Query Planning¶
HashAggregateExec
is the preferred aggregate physical operator in Aggregation execution planning strategy (over ObjectHashAggregateExec and SortAggregateExec operators).
HashAggregateExec
is selected as the aggregate physical operator for Aggregate logical operator when all the AggregateFunctions (of the AggregateExpressions) use aggBufferAttributes with mutable data types:
BooleanType
ByteType
CalendarIntervalType
DateType
DayTimeIntervalType
DecimalType
DoubleType
FloatType
IntegerType
LongType
NullType
ShortType
TimestampNTZType
TimestampType
- UserDefinedType with a mutable data type
YearMonthIntervalType
Required Child Distribution¶
SparkPlan
requiredChildDistribution: List[Distribution]
requiredChildDistribution
is part of the SparkPlan abstraction.
requiredChildDistribution
varies per the input required child distribution expressions:
- AllTuples when defined, but empty
- ClusteredDistribution for non-empty expressions
- UnspecifiedDistribution when undefined
Note
requiredChildDistributionExpressions
is exactly requiredChildDistributionExpressions
from AggUtils.createAggregate and is undefined by default.
(No distinct in aggregation) requiredChildDistributionExpressions
is undefined when HashAggregateExec
is created for partial aggregations (i.e. mode
is Partial
for aggregate expressions).
requiredChildDistributionExpressions
is defined, but could possibly be empty, when HashAggregateExec
is created for final aggregations (i.e. mode
is Final
for aggregate expressions).
(one distinct in aggregation) requiredChildDistributionExpressions
is undefined when HashAggregateExec
is created for partial aggregations (i.e. mode
is Partial
for aggregate expressions) with one distinct in aggregation.
requiredChildDistributionExpressions
is defined, but could possibly be empty, when HashAggregateExec
is created for partial merge aggregations (i.e. mode
is PartialMerge
for aggregate expressions).
FIXME for the following two cases in aggregation with one distinct.
Executing Physical Operator¶
doExecute
requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]
).
doExecute
transforms the RDD[InternalRow]
with the transformation f
function for every partition (using RDD.mapPartitionsWithIndex transformation).
Transformation Function¶
doExecute
uses RDD.mapPartitionsWithIndex to process partition InternalRows (with a partition ID).
f: (Int, Iterator[T]) => Iterator[U]
For every partition, mapPartitionsWithIndex
records the start execution time (beforeAgg
).
mapPartitionsWithIndex
branches off based on whether there are input rows and grouping keys.
For a grouped aggregate (grouping keys defined) with no input rows (an empty partition), doExecute
returns an empty iterator.
Otherwise, doExecute
creates a TungstenAggregationIterator.
With no grouping keys defined and no input rows (an empty partition), doExecute
increments the numOutputRows metric and returns a single-element Iterator[UnsafeRow]
from the TungstenAggregationIterator.
With input rows available (and regardless of grouping keys), doExecute
returns the TungstenAggregationIterator
.
In the end, doExecute
calculates the aggTime metric and returns an Iterator[UnsafeRow]
that can be as follows:
- Empty
- A single-element one
- The TungstenAggregationIterator
RDD.mapPartitionsWithIndex¶
RDD.mapPartitionsWithIndex
mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
mapPartitionsWithIndex
transformation adds a new MapPartitionsRDD to the RDD lineage.
FIXME Disable AQE
val ids = spark.range(1)
println(ids.queryExecution.toRdd.toDebugString)
(12) SQLExecutionRDD[3] at toRdd at <console>:1 []
| MapPartitionsRDD[2] at toRdd at <console>:1 []
| MapPartitionsRDD[1] at toRdd at <console>:1 []
| ParallelCollectionRDD[0] at toRdd at <console>:1 []
// Use groupBy that gives HashAggregateExec operator
val q = ids.groupBy('id).count
q.explain
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[id#2L], functions=[count(1)])
+- HashAggregate(keys=[id#2L], functions=[partial_count(1)])
+- Range (0, 1, step=1, splits=12)
val rdd = q.queryExecution.toRdd
println(rdd.toDebugString)
(12) SQLExecutionRDD[11] at toRdd at <console>:1 []
| MapPartitionsRDD[10] at toRdd at <console>:1 []
| MapPartitionsRDD[9] at toRdd at <console>:1 []
| ParallelCollectionRDD[8] at toRdd at <console>:1 []
Whole-Stage Code Generation¶
HashAggregateExec
supports Whole-Stage Code Generation (as an AggregateCodegenSupport physical operator) only when supportCodegen flag is enabled.
HashAggregateExec
is given hashAgg
as the variable prefix.
doConsumeWithKeys¶
AggregateCodegenSupport
doConsumeWithKeys(
ctx: CodegenContext,
input: Seq[ExprCode]): String
doConsumeWithKeys
is part of the AggregateCodegenSupport abstraction.
doConsumeWithKeys
...FIXME
doProduceWithKeys¶
AggregateCodegenSupport
doProduceWithKeys(
ctx: CodegenContext): String
doProduceWithKeys
is part of the AggregateCodegenSupport abstraction.
doProduceWithKeys
uses the following configuration properties:
- spark.sql.codegen.aggregate.map.twolevel.enabled
- spark.sql.codegen.aggregate.map.vectorized.enable
- spark.sql.codegen.aggregate.fastHashMap.capacityBit
doProduceWithKeys
...FIXME
In the end, doProduceWithKeys
generates the following Java code (with the []
-marked sections filled out):
if (![initAgg]) {
[initAgg] = true;
[createFastHashMap]
[addHookToCloseFastHashMap]
[hashMapTerm] = [thisPlan].createHashMap();
long [beforeAgg] = System.nanoTime();
[doAggFuncName]();
[aggTime].add((System.nanoTime() - [beforeAgg]) / [NANOS_PER_MILLIS]);
}
// output the result
[outputFromFastHashMap]
[outputFromRegularHashMap]
Creating HashMap¶
createHashMap(): UnsafeFixedWidthAggregationMap
Note
createHashMap
is used in the Java code from doProduceWithKeys.
createHashMap
requests all the DeclarativeAggregate functions for the Catalyst expressions to initialize aggregation buffers.
createHashMap
creates an UnsafeProjection for the expressions and executes it (with an "empty" null
row).
Note
Executing an UnsafeProjection produces an UnsafeRow that becomes an empty aggregation buffer of an UnsafeFixedWidthAggregationMap to be created.
In the end, createHashMap
creates an UnsafeFixedWidthAggregationMap with the following:
UnsafeFixedWidthAggregationMap | Value |
---|---|
emptyAggregationBuffer | The UnsafeRow after executing the UnsafeProjection to initialize aggregation buffers |
aggregationBufferSchema | bufferSchema |
groupingKeySchema | groupingKeySchema |
finishAggregate¶
finishAggregate(
hashMap: UnsafeFixedWidthAggregationMap,
sorter: UnsafeKVExternalSorter,
peakMemory: SQLMetric,
spillSize: SQLMetric,
avgHashProbe: SQLMetric,
numTasksFallBacked: SQLMetric): KVIterator[UnsafeRow, UnsafeRow]
finishAggregate
...FIXME
DeclarativeAggregate Functions¶
declFunctions: Seq[DeclarativeAggregate]
declFunctions
is the DeclarativeAggregate expressions among the AggregateFunctions of this AggregateExpressions.
Demo¶
Aggregation Query¶
val data = spark.range(10)
val q = data
.groupBy('id % 2 as "group")
.agg(sum("id") as "sum")
HashAggregateExec
operator should be selected due to:
sum
uses mutable types for aggregate expression- just a single
id
column reference ofLongType
data type
scala> println(q.queryExecution.executedPlan.numberedTreeString)
00 AdaptiveSparkPlan isFinalPlan=false
01 +- HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
02 +- Exchange hashpartitioning(_groupingexpression#8L, 200), ENSURE_REQUIREMENTS, [plan_id=15]
03 +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
04 +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
05 +- Range (0, 10, step=1, splits=16)
Generate Final Plan¶
isFinalPlan
flag is false
. Let's execute it and access the final plan.
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
val op = q
.queryExecution
.executedPlan
.collect { case op: AdaptiveSparkPlanExec => op }
.head
Execute the adaptive operator to generate the final execution plan.
op.executeTake(1)
isFinalPlan
flag should now be true
.
scala> println(op.treeString)
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(2) HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(_groupingexpression#8L, 200), ENSURE_REQUIREMENTS, [plan_id=25]
+- *(1) HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
+- *(1) Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
+- *(1) Range (0, 10, step=1, splits=16)
+- == Initial Plan ==
HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
+- Exchange hashpartitioning(_groupingexpression#8L, 200), ENSURE_REQUIREMENTS, [plan_id=15]
+- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
+- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
+- Range (0, 10, step=1, splits=16)
With the isFinalPlan
flag true
, it is possible to print out the WholeStageCodegen subtrees.
scala> q.queryExecution.debug.codegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:284; maxConstantPoolSize:337(0.51% used); numInnerClasses:2) ==
*(1) HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
+- *(1) Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
+- *(1) Range (0, 10, step=1, splits=16)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean hashAgg_initAgg_0;
/* 010 */ private org.apache.spark.unsafe.KVIterator hashAgg_mapIter_0;
/* 011 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap hashAgg_hashMap_0;
/* 012 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter hashAgg_sorter_0;
/* 013 */ private scala.collection.Iterator inputadapter_input_0;
/* 014 */ private boolean hashAgg_hashAgg_isNull_2_0;
...
Let's access the generated source code via WholeStageCodegenExec physical operator.
val aqe = op
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = aqe.executedPlan
.collect { case op: WholeStageCodegenExec => op }
.head
val (_, source) = wsce.doCodeGen
import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
val formattedCode = CodeFormatter.format(source)
scala> println(formattedCode)
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean hashAgg_initAgg_0;
/* 010 */ private org.apache.spark.unsafe.KVIterator hashAgg_mapIter_0;
/* 011 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap hashAgg_hashMap_0;
/* 012 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter hashAgg_sorter_0;
/* 013 */ private scala.collection.Iterator inputadapter_input_0;
/* 014 */ private boolean hashAgg_hashAgg_isNull_2_0;
...
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
01 +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
02 +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
03 +- Range (0, 10, step=1, splits=16)
Going low level. Mind your steps! 😊
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
val aggLog = q.queryExecution.optimizedPlan.asInstanceOf[Aggregate]
import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
val (_, aggregateExpressions: Seq[AggregateExpression], _, _) = PhysicalAggregation.unapply(aggLog).get
val aggregateBufferAttributes =
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
Here comes the very reason why HashAggregateExec
was selected. Aggregation execution planning strategy prefers HashAggregateExec
when aggregateBufferAttributes
are supported.
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
assert(Aggregate.supportsHashAggregate(aggregateBufferAttributes))
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[_groupingexpression#8L], functions=[sum(id#0L)], output=[group#2L, sum#5L])
01 +- HashAggregate(keys=[_groupingexpression#8L], functions=[partial_sum(id#0L)], output=[_groupingexpression#8L, sum#10L])
02 +- Project [id#0L, (id#0L % 2) AS _groupingexpression#8L]
03 +- Range (0, 10, step=1, splits=16)
Execute HashAggregateExec¶
val hashAggExecRDD = hashAggExec.execute
println(hashAggExecRDD.toDebugString)
(16) MapPartitionsRDD[4] at execute at <console>:1 []
| MapPartitionsRDD[3] at execute at <console>:1 []
| MapPartitionsRDD[2] at execute at <console>:1 []
| MapPartitionsRDD[1] at execute at <console>:1 []
| ParallelCollectionRDD[0] at execute at <console>:1 []
Java Code for Produce Execution Path¶
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext
val parent = hashAggExec
val doProduceWithKeysCode = hashAggExec.produce(ctx, parent)
scala> println(doProduceWithKeysCode)
if (!hashAgg_initAgg_0) {
hashAgg_initAgg_0 = true;
hashAgg_hashMap_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
long hashAgg_beforeAgg_1 = System.nanoTime();
hashAgg_doAggregateWithKeys_0();
((org.apache.spark.sql.execution.metric.SQLMetric) references[16] /* aggTime */).add((System.nanoTime() - hashAgg_beforeAgg_1) / 1000000);
}
// output the result
while ( hashAgg_mapIter_0.next()) {
UnsafeRow hashAgg_aggKey_1 = (UnsafeRow) hashAgg_mapIter_0.getKey();
UnsafeRow hashAgg_aggBuffer_1 = (UnsafeRow) hashAgg_mapIter_0.getValue();
hashAgg_doAggregateWithKeysOutput_1(hashAgg_aggKey_1, hashAgg_aggBuffer_1);
if (shouldStop()) return;
}
hashAgg_mapIter_0.close();
if (hashAgg_sorter_0 == null) {
hashAgg_hashMap_0.free();
}