TungstenAggregationIterator¶
TungstenAggregationIterator is an AggregationIterator that is used by HashAggregateExec physical operator to process rows in a partition.
TungstenAggregationIterator starts processing using UnsafeFixedWidthAggregationMap in Hash-Based Processing Mode until it runs out of memory (starts spilling to disk) and switches to Sort-Based Aggregation.
Lifecycle¶
TungstenAggregationIterator is created for HashAggregateExec physical operator when executed with a non-empty partition or no Grouping Keys.
There is one TungstenAggregationIterator created per partition of HashAggregateExec physical operator.
TungstenAggregationIterator immediately initializes the internal registries:
- Spill Size Before Execution
- Initial Aggregation Buffer
- UnsafeFixedWidthAggregationMap
- Sort-Based Aggregation Buffer
TungstenAggregationIterator immediately starts processing input rows and, if not switched to sort-based aggregation, initializes the other internal registries:
TungstenAggregationIterator frees up the memory associated with UnsafeFixedWidthAggregationMap if the map is empty.
TungstenAggregationIterator registers a task completion listener to be executed at the end of this task.
TungstenAggregationIterator is an Iterator[UnsafeRow] (indirectly as a AggregationIterator) and so is a data structure that allows to iterate over a sequence of UnsafeRows. The sequence of UnsafeRows is partition data.
As with any Iterator, TungstenAggregationIterator comes with the following:
- hasNext method for checking if there is a next row available
- next method which returns the next row and advances itself
TaskCompletionListener¶
TungstenAggregationIterator registers a TaskCompletionListener (Spark Core) that is executed at the end of this task (one per partition).
When executed, the TaskCompletionListener updates the metrics.
| Metric | Value |
|---|---|
| peak memory | The maximum of the getPeakMemoryUsedBytes of this UnsafeFixedWidthAggregationMap and the getPeakMemoryUsedBytes of this UnsafeKVExternalSorter for sort-based aggregation, if a switch happened |
| spillSize | |
| avgHashProbe | The getAvgHashProbesPerKey of this UnsafeFixedWidthAggregationMap |
The TaskCompletionListener requests the TaskMetrics (Spark Core) to incPeakExecutionMemory.
Creating Instance¶
TungstenAggregationIterator takes the following to be created:
- Partition ID
- Grouping Named Expressions
- AggregateExpressions
- Aggregate Attributes
- Initial input buffer offset
- Result NamedExpressions
- Function to create a new
MutableProjectiongiven expressions and attributes ((Seq[Expression], Seq[Attribute]) => MutableProjection) - Original Input Attributes
- Input Iterator of InternalRows (from a single partition of the child of the HashAggregateExec physical operator)
- (only for testing) Optional
HashAggregateExec's testFallbackStartsAt -
numOutputRowsPerformance Metric - peak memory Performance Metric
-
spillSizePerformance Metric -
avgHashProbePerformance Metric - number of sort fallback tasks Performance Metric
TungstenAggregationIterator starts processing input rows and pre-loads the first key-value pair from the UnsafeFixedWidthAggregationMap unless switched to a sort-based aggregation.
TungstenAggregationIterator is created when:
HashAggregateExecphysical operator is executed
Performance Metrics¶
TungstenAggregationIterator is given the performance metrics of the owning HashAggregateExec aggregate physical operator when created.
-
numOutputRows is used when
TungstenAggregationIteratoris requested for the next UnsafeRow (and it has one) -
peak memory, spillSize and avgHashProbe are used at the end of every task (one per partition)
The metrics are displayed as part of HashAggregateExec aggregate physical operator (e.g. in web UI in Details for Query).

number of sort fallback tasks¶
TungstenAggregationIterator is given number of sort fallback tasks performance metric when created.
The metric is number of sort fallback tasks metric of the owning HashAggregateExec physical operator.
The metric is incremented only when TungstenAggregationIterator is requested to fall back to sort-based aggregation.
peak memory¶
TungstenAggregationIterator is given peak memory performance metric when created.
The metric is peak memory metric of the owning HashAggregateExec physical operator.
The metric is set at task completion.
Checking for Next Row Available¶
hasNext is enabled (true) when one of the following holds:
- Either this
TungstenAggregationIteratoris sort-based and sortedInputHasNewGroup - Or this
TungstenAggregationIteratoris not sort-based and mapIteratorHasNext
Next Row¶
next...FIXME
processCurrentSortedGroup¶
processCurrentSortedGroup(): Unit
processCurrentSortedGroup...FIXME
UnsafeFixedWidthAggregationMap¶
When created, TungstenAggregationIterator creates an UnsafeFixedWidthAggregationMap with the following:
- initialAggregationBuffer
- Schema built from the attributes of the aggregation buffers of all the AggregateFunctions
- Schema built from the attributes of all the grouping expressions
Used when:
TungstenAggregationIteratoris requested for the next UnsafeRow, to outputForEmptyGroupingKeyWithoutInput, process input rows, to initialize the aggregationBufferMapIterator and every time a partition has been processed
Hash- vs Sort-Based Aggregations¶
sortBased: Boolean = false
TungstenAggregationIterator turns sortBased flag off (false) when created.
sortBased flag is turned on (true) at the end of switching to sort-based aggregation (alongside incrementing number of sort fallback tasks metric).
In other words, sortBased flag indicates whether TungstenAggregationIterator has switched (fallen back) to sort-based aggregation (from initial hash-based processing mode) while processing input rows.
As long as the underlying UnsafeFixedWidthAggregationMap has got enough memory to hold grouping keys (and externalSorter is not created), TungstenAggregationIterator uses processRow function to process rows.
It is only when externalSorter is created, TungstenAggregationIterator switches to sort-based aggregation.
Initial Aggregation Buffer¶
initialAggregationBuffer: UnsafeRow
TungstenAggregationIterator initializes initialAggregationBuffer (as a new UnsafeRow) when created.
initialAggregationBuffer is used as the emptyAggregationBuffer of the UnsafeFixedWidthAggregationMap.
When requested for next row in sortBased aggregation, TungstenAggregationIterator copies the initialAggregationBuffer to the sortBasedAggregationBuffer.
When requested to outputForEmptyGroupingKeyWithoutInput with no groupingExpressions, TungstenAggregationIterator copies the initialAggregationBuffer to the sortBasedAggregationBuffer.
Sort-Based Aggregation Buffer¶
sortBasedAggregationBuffer: UnsafeRow
TungstenAggregationIterator initializes sortBasedAggregationBuffer to be a new UnsafeRow when created.
sortBasedAggregationBuffer is used when TungstenAggregationIterator is requested for next row in sort-based processing mode.
sortBasedAggregationBuffer is copied from the initialAggregationBuffer when requested for:
- Next row (in sort-based processing mode)
- outputForEmptyGroupingKeyWithoutInput
createNewAggregationBuffer¶
createNewAggregationBuffer(): UnsafeRow
createNewAggregationBuffer creates an UnsafeRow.
createNewAggregationBuffer...FIXME
createNewAggregationBuffer is used when:
TungstenAggregationIteratoris created (and creates the initialAggregationBuffer and sortBasedAggregationBuffer buffers)
Processing Input Rows¶
processInputs(
fallbackStartsAt: (Int, Int)): Unit
Procedure
processInputs returns Unit (nothing) and whatever happens inside stays inside (just like in Las Vegas, doesn't it?! 😉)
processInputs is used when:
TungstenAggregationIteratoris created
processInputs branches off based on the grouping expressions, specified or not.
Grouping Expressions Specified¶
processInputs...FIXME
No Grouping Expressions¶
With no grouping expressions, processInputs generates one single grouping key (an UnsafeRow) for all the partition rows. processInputs executes (applies) the grouping projection to a null (undefined) row.
processInputs looks up the aggregation buffer (UnsafeRow) in the UnsafeFixedWidthAggregationMap for the generated grouping key.
In the end, for every InternalRow in the inputIter, processInputs processRow one by one (with the same aggregation buffer).
Falling Back to Sort-Based Aggregation¶
switchToSortBasedAggregation(): Unit
Procedure
switchToSortBasedAggregation returns Unit (nothing) and whatever happens inside stays inside (just like in Las Vegas, doesn't it?! 😉)
switchToSortBasedAggregation prints out the following INFO message to the logs:
falling back to sort based aggregation.
switchToSortBasedAggregation initializes the sortBasedProcessRow to be generateProcessRow with the newExpressions, newFunctions, newInputAttributes:
newExpressionsis this AggregateExpressions with the AggregateExpressions inPartialorCompleteaggregation modes changed toPartialMergeorFinal, respectivelynewFunctionsis initializeAggregateFunctions with thenewExpressionsandstartingInputBufferOffsetas0newInputAttributesis the inputAggBufferAttributes of thenewFunctionsaggregate functions
switchToSortBasedAggregation initializes the sortedKVIterator to be the KVSorterIterator of this UnsafeKVExternalSorter.
switchToSortBasedAggregation pre-loads the first key-value pair from the sorted iterator (to make hasNext idempotent). switchToSortBasedAggregation requests this UnsafeKVExternalSorter if there is next element and stores the answer in this sortedInputHasNewGroup.
With sortedInputHasNewGroup enabled (true), switchToSortBasedAggregation...FIXME
In the end, switchToSortBasedAggregation turns this sortBased flag on and increments number of sort fallback tasks metric.
mapIteratorHasNext¶
var mapIteratorHasNext: Boolean = false
mapIteratorHasNext is an internal variable that starts disabled (false) when TungstenAggregationIterator is created.
TungstenAggregationIterator uses mapIteratorHasNext for hash-based aggregation (not sort-based) to indicate whether the aggregationBufferMapIterator has next key-value pair or not when:
mapIteratorHasNext is used to pre-load next key-value pair form aggregationBufferMapIterator to make hasNext idempotent.
mapIteratorHasNext is also used to control whether to free up the memory associated with the UnsafeFixedWidthAggregationMap while in hash-based processing mode.
Output Row for Empty Partition and No Grouping Keys¶
outputForEmptyGroupingKeyWithoutInput(): UnsafeRow
outputForEmptyGroupingKeyWithoutInput requests the sortBasedAggregationBuffer to copy the bytes off from the initialAggregationBuffer.
outputForEmptyGroupingKeyWithoutInput generates the result using the generateOutput function with an empty UnsafeRow and the sortBasedAggregationBuffer.
In the end, outputForEmptyGroupingKeyWithoutInput frees up the memory associated with this UnsafeFixedWidthAggregationMap.
groupingExpressions Should Be Empty or IllegalStateException
outputForEmptyGroupingKeyWithoutInput throws an IllegalStateException when executed with the groupingExpressions specified:
This method should not be called when groupingExpressions is not empty.
That cannot really happen since HashAggregateExec physical operator makes sure to execute outputForEmptyGroupingKeyWithoutInput when there are no groupingExpressions.
outputForEmptyGroupingKeyWithoutInput is used when:
HashAggregateExecphysical operator is requested to execute (with an empty partition and no grouping expressions)
UnsafeKVExternalSorter¶
var externalSorter: UnsafeKVExternalSorter = null
externalSorter is an UnsafeKVExternalSorter.
externalSorter is uninitialized (null) when TungstenAggregationIterator is created.
TungstenAggregationIterator initializes the externalSorter to be the UnsafeKVExternalSorter (of UnsafeFixedWidthAggregationMap) when the UnsafeFixedWidthAggregationMap found no buffer (null) for a grouping key when getAggregationBufferFromUnsafeRow (likely due to running out of memory) while processInputs.
TungstenAggregationIterator switchToSortBasedAggregation right after processing the whole partition that ended up with an UnsafeKVExternalSorter.
While switchToSortBasedAggregation, TungstenAggregationIterator requests the UnsafeKVExternalSorter for a KVSorterIterator (sortedKVIterator).
Peak memory consumption can be monitored using peakMemory performance metric.
Generating Result Projection¶
AggregationIterator
generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow
generateResultProjection is part of the AggregationIterator abstraction.
generateResultProjection uses an UnsafeRowJoiner for a fast(er) path (than projection) for partial aggregation (when the aggregateExpressions have aggregation modes that are neither Final nor Complete aggregation mode).
Demo¶
val q = spark.range(10).
groupBy('id % 2 as "group").
agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02 +- Range (0, 10, step=1, splits=8)
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute
// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
import org.apache.spark.rdd.RDD
def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
import org.apache.spark.rdd.MapPartitionsRDD
hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
}
}
// END :paste -raw
import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)
import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.TungstenAggregationIterator.name = org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
logger.TungstenAggregationIterator.level = all
Refer to Logging.