TungstenAggregationIterator¶
TungstenAggregationIterator
is an AggregationIterator that is used by HashAggregateExec physical operator to process rows in a partition.
TungstenAggregationIterator
starts processing using UnsafeFixedWidthAggregationMap in Hash-Based Processing Mode until it runs out of memory (starts spilling to disk) and switches to Sort-Based Aggregation.
Lifecycle¶
TungstenAggregationIterator
is created for HashAggregateExec
physical operator when executed with a non-empty partition or no Grouping Keys.
There is one TungstenAggregationIterator
created per partition of HashAggregateExec
physical operator.
TungstenAggregationIterator
immediately initializes the internal registries:
- Spill Size Before Execution
- Initial Aggregation Buffer
- UnsafeFixedWidthAggregationMap
- Sort-Based Aggregation Buffer
TungstenAggregationIterator
immediately starts processing input rows and, if not switched to sort-based aggregation, initializes the other internal registries:
TungstenAggregationIterator
frees up the memory associated with UnsafeFixedWidthAggregationMap if the map is empty.
TungstenAggregationIterator
registers a task completion listener to be executed at the end of this task.
TungstenAggregationIterator
is an Iterator[UnsafeRow]
(indirectly as a AggregationIterator) and so is a data structure that allows to iterate over a sequence of UnsafeRow
s. The sequence of UnsafeRow
s is partition data.
As with any Iterator
, TungstenAggregationIterator
comes with the following:
- hasNext method for checking if there is a next row available
- next method which returns the next row and advances itself
TaskCompletionListener¶
TungstenAggregationIterator
registers a TaskCompletionListener
(Spark Core) that is executed at the end of this task (one per partition).
When executed, the TaskCompletionListener
updates the metrics.
Metric | Value |
---|---|
peak memory | The maximum of the getPeakMemoryUsedBytes of this UnsafeFixedWidthAggregationMap and the getPeakMemoryUsedBytes of this UnsafeKVExternalSorter for sort-based aggregation, if a switch happened |
spillSize | |
avgHashProbe | The getAvgHashProbesPerKey of this UnsafeFixedWidthAggregationMap |
The TaskCompletionListener
requests the TaskMetrics
(Spark Core) to incPeakExecutionMemory
.
Creating Instance¶
TungstenAggregationIterator
takes the following to be created:
- Partition ID
- Grouping Named Expressions
- AggregateExpressions
- Aggregate Attributes
- Initial input buffer offset
- Result NamedExpressions
- Function to create a new
MutableProjection
given expressions and attributes ((Seq[Expression], Seq[Attribute]) => MutableProjection
) - Original Input Attributes
- Input Iterator of InternalRows (from a single partition of the child of the HashAggregateExec physical operator)
- (only for testing) Optional
HashAggregateExec
's testFallbackStartsAt -
numOutputRows
Performance Metric - peak memory Performance Metric
-
spillSize
Performance Metric -
avgHashProbe
Performance Metric - number of sort fallback tasks Performance Metric
TungstenAggregationIterator
starts processing input rows and pre-loads the first key-value pair from the UnsafeFixedWidthAggregationMap unless switched to a sort-based aggregation.
TungstenAggregationIterator
is created when:
HashAggregateExec
physical operator is executed
Performance Metrics¶
TungstenAggregationIterator
is given the performance metrics of the owning HashAggregateExec aggregate physical operator when created.
-
numOutputRows is used when
TungstenAggregationIterator
is requested for the next UnsafeRow (and it has one) -
peak memory, spillSize and avgHashProbe are used at the end of every task (one per partition)
The metrics are displayed as part of HashAggregateExec aggregate physical operator (e.g. in web UI in Details for Query).
number of sort fallback tasks¶
TungstenAggregationIterator
is given number of sort fallback tasks performance metric when created.
The metric is number of sort fallback tasks metric of the owning HashAggregateExec physical operator.
The metric is incremented only when TungstenAggregationIterator
is requested to fall back to sort-based aggregation.
peak memory¶
TungstenAggregationIterator
is given peak memory performance metric when created.
The metric is peak memory metric of the owning HashAggregateExec physical operator.
The metric is set at task completion.
Checking for Next Row Available¶
hasNext
is enabled (true
) when one of the following holds:
- Either this
TungstenAggregationIterator
is sort-based and sortedInputHasNewGroup - Or this
TungstenAggregationIterator
is not sort-based and mapIteratorHasNext
Next Row¶
next
...FIXME
processCurrentSortedGroup¶
processCurrentSortedGroup(): Unit
processCurrentSortedGroup
...FIXME
UnsafeFixedWidthAggregationMap¶
When created, TungstenAggregationIterator
creates an UnsafeFixedWidthAggregationMap with the following:
- initialAggregationBuffer
- Schema built from the attributes of the aggregation buffers of all the AggregateFunctions
- Schema built from the attributes of all the grouping expressions
Used when:
TungstenAggregationIterator
is requested for the next UnsafeRow, to outputForEmptyGroupingKeyWithoutInput, process input rows, to initialize the aggregationBufferMapIterator and every time a partition has been processed
Hash- vs Sort-Based Aggregations¶
sortBased: Boolean = false
TungstenAggregationIterator
turns sortBased
flag off (false
) when created.
sortBased
flag is turned on (true
) at the end of switching to sort-based aggregation (alongside incrementing number of sort fallback tasks metric).
In other words, sortBased
flag indicates whether TungstenAggregationIterator
has switched (fallen back) to sort-based aggregation (from initial hash-based processing mode) while processing input rows.
As long as the underlying UnsafeFixedWidthAggregationMap has got enough memory to hold grouping keys (and externalSorter is not created), TungstenAggregationIterator
uses processRow function to process rows.
It is only when externalSorter is created, TungstenAggregationIterator
switches to sort-based aggregation.
Initial Aggregation Buffer¶
initialAggregationBuffer: UnsafeRow
TungstenAggregationIterator
initializes initialAggregationBuffer
(as a new UnsafeRow) when created.
initialAggregationBuffer
is used as the emptyAggregationBuffer of the UnsafeFixedWidthAggregationMap.
When requested for next row in sortBased aggregation, TungstenAggregationIterator
copies the initialAggregationBuffer
to the sortBasedAggregationBuffer.
When requested to outputForEmptyGroupingKeyWithoutInput with no groupingExpressions, TungstenAggregationIterator
copies the initialAggregationBuffer
to the sortBasedAggregationBuffer.
Sort-Based Aggregation Buffer¶
sortBasedAggregationBuffer: UnsafeRow
TungstenAggregationIterator
initializes sortBasedAggregationBuffer
to be a new UnsafeRow when created.
sortBasedAggregationBuffer
is used when TungstenAggregationIterator
is requested for next row in sort-based processing mode.
sortBasedAggregationBuffer
is copied from the initialAggregationBuffer when requested for:
- Next row (in sort-based processing mode)
- outputForEmptyGroupingKeyWithoutInput
createNewAggregationBuffer¶
createNewAggregationBuffer(): UnsafeRow
createNewAggregationBuffer
creates an UnsafeRow.
createNewAggregationBuffer
...FIXME
createNewAggregationBuffer
is used when:
TungstenAggregationIterator
is created (and creates the initialAggregationBuffer and sortBasedAggregationBuffer buffers)
Processing Input Rows¶
processInputs(
fallbackStartsAt: (Int, Int)): Unit
Procedure
processInputs
returns Unit
(nothing) and whatever happens inside stays inside (just like in Las Vegas, doesn't it?! 😉)
processInputs
is used when:
TungstenAggregationIterator
is created
processInputs
branches off based on the grouping expressions, specified or not.
Grouping Expressions Specified¶
processInputs
...FIXME
No Grouping Expressions¶
With no grouping expressions, processInputs
generates one single grouping key (an UnsafeRow) for all the partition rows. processInputs
executes (applies) the grouping projection to a null
(undefined) row.
processInputs
looks up the aggregation buffer (UnsafeRow) in the UnsafeFixedWidthAggregationMap for the generated grouping key.
In the end, for every InternalRow
in the inputIter, processInputs
processRow one by one (with the same aggregation buffer).
Falling Back to Sort-Based Aggregation¶
switchToSortBasedAggregation(): Unit
Procedure
switchToSortBasedAggregation
returns Unit
(nothing) and whatever happens inside stays inside (just like in Las Vegas, doesn't it?! 😉)
switchToSortBasedAggregation
prints out the following INFO message to the logs:
falling back to sort based aggregation.
switchToSortBasedAggregation
initializes the sortBasedProcessRow to be generateProcessRow with the newExpressions
, newFunctions
, newInputAttributes
:
newExpressions
is this AggregateExpressions with the AggregateExpressions inPartial
orComplete
aggregation modes changed toPartialMerge
orFinal
, respectivelynewFunctions
is initializeAggregateFunctions with thenewExpressions
andstartingInputBufferOffset
as0
newInputAttributes
is the inputAggBufferAttributes of thenewFunctions
aggregate functions
switchToSortBasedAggregation
initializes the sortedKVIterator to be the KVSorterIterator of this UnsafeKVExternalSorter.
switchToSortBasedAggregation
pre-loads the first key-value pair from the sorted iterator (to make hasNext idempotent). switchToSortBasedAggregation
requests this UnsafeKVExternalSorter if there is next element and stores the answer in this sortedInputHasNewGroup.
With sortedInputHasNewGroup enabled (true
), switchToSortBasedAggregation
...FIXME
In the end, switchToSortBasedAggregation
turns this sortBased flag on and increments number of sort fallback tasks metric.
mapIteratorHasNext¶
var mapIteratorHasNext: Boolean = false
mapIteratorHasNext
is an internal variable that starts disabled (false
) when TungstenAggregationIterator
is created.
TungstenAggregationIterator
uses mapIteratorHasNext
for hash-based aggregation (not sort-based) to indicate whether the aggregationBufferMapIterator has next key-value pair or not when:
mapIteratorHasNext
is used to pre-load next key-value pair form aggregationBufferMapIterator to make hasNext idempotent.
mapIteratorHasNext
is also used to control whether to free up the memory associated with the UnsafeFixedWidthAggregationMap while in hash-based processing mode.
Output Row for Empty Partition and No Grouping Keys¶
outputForEmptyGroupingKeyWithoutInput(): UnsafeRow
outputForEmptyGroupingKeyWithoutInput
requests the sortBasedAggregationBuffer to copy the bytes off from the initialAggregationBuffer.
outputForEmptyGroupingKeyWithoutInput
generates the result using the generateOutput function with an empty UnsafeRow
and the sortBasedAggregationBuffer.
In the end, outputForEmptyGroupingKeyWithoutInput
frees up the memory associated with this UnsafeFixedWidthAggregationMap.
groupingExpressions Should Be Empty or IllegalStateException
outputForEmptyGroupingKeyWithoutInput
throws an IllegalStateException
when executed with the groupingExpressions specified:
This method should not be called when groupingExpressions is not empty.
That cannot really happen since HashAggregateExec
physical operator makes sure to execute outputForEmptyGroupingKeyWithoutInput
when there are no groupingExpressions.
outputForEmptyGroupingKeyWithoutInput
is used when:
HashAggregateExec
physical operator is requested to execute (with an empty partition and no grouping expressions)
UnsafeKVExternalSorter¶
var externalSorter: UnsafeKVExternalSorter = null
externalSorter
is an UnsafeKVExternalSorter.
externalSorter
is uninitialized (null
) when TungstenAggregationIterator
is created.
TungstenAggregationIterator
initializes the externalSorter
to be the UnsafeKVExternalSorter (of UnsafeFixedWidthAggregationMap) when the UnsafeFixedWidthAggregationMap found no buffer (null
) for a grouping key when getAggregationBufferFromUnsafeRow (likely due to running out of memory) while processInputs.
TungstenAggregationIterator
switchToSortBasedAggregation right after processing the whole partition that ended up with an UnsafeKVExternalSorter
.
While switchToSortBasedAggregation, TungstenAggregationIterator
requests the UnsafeKVExternalSorter for a KVSorterIterator (sortedKVIterator).
Peak memory consumption can be monitored using peakMemory performance metric.
Generating Result Projection¶
AggregationIterator
generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow
generateResultProjection
is part of the AggregationIterator abstraction.
generateResultProjection
uses an UnsafeRowJoiner
for a fast(er) path (than projection) for partial aggregation (when the aggregateExpressions have aggregation modes that are neither Final nor Complete aggregation mode).
Demo¶
val q = spark.range(10).
groupBy('id % 2 as "group").
agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02 +- Range (0, 10, step=1, splits=8)
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute
// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
import org.apache.spark.rdd.RDD
def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
import org.apache.spark.rdd.MapPartitionsRDD
hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
}
}
// END :paste -raw
import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)
import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.TungstenAggregationIterator.name = org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
logger.TungstenAggregationIterator.level = all
Refer to Logging.