TungstenAggregationIterator¶
TungstenAggregationIterator
is an AggregationIterator for HashAggregateExec physical operator.
TungstenAggregationIterator
prefers hash-based aggregation before switching to sort-based one.
Creating Instance¶
TungstenAggregationIterator
takes the following to be created:
- Partition ID
- Grouping NamedExpressions
- AggregateExpressions
- Aggregate Attributes
- Initial input buffer offset
- Result NamedExpressions
- Function to create a new
MutableProjection
given expressions and attributes ((Seq[Expression], Seq[Attribute]) => MutableProjection
) - Original Input Attributes
- Input Iterator of InternalRows (from a single partition of the child of the HashAggregateExec physical operator)
- (only for testing) Optional
HashAggregateExec
's testFallbackStartsAt -
numOutputRows
SQLMetric -
peakMemory
SQLMetric -
spillSize
SQLMetric -
avgHashProbe
SQLMetric
TungstenAggregationIterator
is created when:
HashAggregateExec
physical operator is executed
TungstenAggregationIterator
starts processing input rows and pre-loads the first key-value pair from the UnsafeFixedWidthAggregationMap unless switched to a sort-based aggregation.
Performance Metrics¶
When created, TungstenAggregationIterator
gets SQLMetrics from the HashAggregateExec aggregate physical operator being executed.
-
numOutputRows is used when
TungstenAggregationIterator
is requested for the next UnsafeRow (and it has one) -
peakMemory, spillSize and avgHashProbe are used at the end of every task (one per partition)
The metrics are displayed as part of HashAggregateExec aggregate physical operator (e.g. in web UI in Details for Query).
Next UnsafeRow¶
next(): UnsafeRow
next
is part of the Iterator
(Scala) abstraction.
next
...FIXME
processCurrentSortedGroup¶
processCurrentSortedGroup(): Unit
processCurrentSortedGroup
...FIXME
UnsafeFixedWidthAggregationMap¶
When created, TungstenAggregationIterator
creates an UnsafeFixedWidthAggregationMap with the following:
- initialAggregationBuffer
- Schema built from the attributes of the aggregation buffers of all the AggregateFunctions
- Schema built from the attributes of all the grouping expressions
Used when:
TungstenAggregationIterator
is requested for the next UnsafeRow, to outputForEmptyGroupingKeyWithoutInput, process input rows, to initialize the aggregationBufferMapIterator and every time a partition has been processed
TaskCompletionListener¶
TungstenAggregationIterator
registers a TaskCompletionListener
that is executed on task completion (for every task that processes a partition).
When executed (once per partition), the TaskCompletionListener
updates the following metrics:
outputForEmptyGroupingKeyWithoutInput¶
outputForEmptyGroupingKeyWithoutInput(): UnsafeRow
outputForEmptyGroupingKeyWithoutInput
...FIXME
outputForEmptyGroupingKeyWithoutInput
is used when:
HashAggregateExec
physical operator is requested to execute (with no input rows and grouping expressions)
Processing Input Rows¶
processInputs(
fallbackStartsAt: (Int, Int)): Unit
processInputs
...FIXME
processInputs
is used when:
TungstenAggregationIterator
is created
Hash- vs Sort-Based Aggregations¶
sortBased: Boolean = false
TungstenAggregationIterator
creates and initializes sortBased
flag to false
when created.
The flag is used to indicate whether TungstenAggregationIterator
has switched (fall back) to sort-based aggregation while processing input rows.
sortBased
flag is turned on (true
) while switching to sort-based aggregation (and the numTasksFallBacked metric is incremented).
Switching from hash-based to sort-based aggregation happens when the external sorter is initialized (that is used for sort-based aggregation).
Demo¶
val q = spark.range(10).
groupBy('id % 2 as "group").
agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02 +- Range (0, 10, step=1, splits=8)
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute
// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
import org.apache.spark.rdd.RDD
def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
import org.apache.spark.rdd.MapPartitionsRDD
hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
}
}
// END :paste -raw
import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)
import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?