Skip to content

TungstenAggregationIterator

TungstenAggregationIterator is an AggregationIterator that is used by HashAggregateExec physical operator to process rows in a partition.

TungstenAggregationIterator starts processing using UnsafeFixedWidthAggregationMap in Hash-Based Processing Mode until it runs out of memory (starts spilling to disk) and switches to Sort-Based Aggregation.

Lifecycle

TungstenAggregationIterator is created for HashAggregateExec physical operator when executed with a non-empty partition or no Grouping Keys.

There is one TungstenAggregationIterator created per partition of HashAggregateExec physical operator.

TungstenAggregationIterator immediately initializes the internal registries:

TungstenAggregationIterator immediately starts processing input rows and, if not switched to sort-based aggregation, initializes the other internal registries:

TungstenAggregationIterator frees up the memory associated with UnsafeFixedWidthAggregationMap if the map is empty.

TungstenAggregationIterator registers a task completion listener to be executed at the end of this task.

TungstenAggregationIterator is an Iterator[UnsafeRow] (indirectly as a AggregationIterator) and so is a data structure that allows to iterate over a sequence of UnsafeRows. The sequence of UnsafeRows is partition data.

As with any Iterator, TungstenAggregationIterator comes with the following:

  • hasNext method for checking if there is a next row available
  • next method which returns the next row and advances itself

TaskCompletionListener

TungstenAggregationIterator registers a TaskCompletionListener (Spark Core) that is executed at the end of this task (one per partition).

When executed, the TaskCompletionListener updates the metrics.

Metric Value
peak memory The maximum of the getPeakMemoryUsedBytes of this UnsafeFixedWidthAggregationMap and the getPeakMemoryUsedBytes of this UnsafeKVExternalSorter for sort-based aggregation, if a switch happened
spillSize
avgHashProbe The getAvgHashProbesPerKey of this UnsafeFixedWidthAggregationMap

The TaskCompletionListener requests the TaskMetrics (Spark Core) to incPeakExecutionMemory.

Creating Instance

TungstenAggregationIterator takes the following to be created:

TungstenAggregationIterator starts processing input rows and pre-loads the first key-value pair from the UnsafeFixedWidthAggregationMap unless switched to a sort-based aggregation.

TungstenAggregationIterator is created when:

  • HashAggregateExec physical operator is executed

Performance Metrics

TungstenAggregationIterator is given the performance metrics of the owning HashAggregateExec aggregate physical operator when created.

The metrics are displayed as part of HashAggregateExec aggregate physical operator (e.g. in web UI in Details for Query).

HashAggregateExec in web UI (Details for Query)

number of sort fallback tasks

TungstenAggregationIterator is given number of sort fallback tasks performance metric when created.

The metric is number of sort fallback tasks metric of the owning HashAggregateExec physical operator.

The metric is incremented only when TungstenAggregationIterator is requested to fall back to sort-based aggregation.

peak memory

TungstenAggregationIterator is given peak memory performance metric when created.

The metric is peak memory metric of the owning HashAggregateExec physical operator.

The metric is set at task completion.

Checking for Next Row Available

Iterator
hasNext: Boolean

hasNext is part of the Iterator (Scala) abstraction.

hasNext is enabled (true) when one of the following holds:

  1. Either this TungstenAggregationIterator is sort-based and sortedInputHasNewGroup
  2. Or this TungstenAggregationIterator is not sort-based and mapIteratorHasNext

Next Row

Iterator
next(): UnsafeRow

next is part of the Iterator (Scala) abstraction.

next...FIXME

processCurrentSortedGroup

processCurrentSortedGroup(): Unit

processCurrentSortedGroup...FIXME

UnsafeFixedWidthAggregationMap

When created, TungstenAggregationIterator creates an UnsafeFixedWidthAggregationMap with the following:

Used when:

Hash- vs Sort-Based Aggregations

sortBased: Boolean = false

TungstenAggregationIterator turns sortBased flag off (false) when created.

sortBased flag is turned on (true) at the end of switching to sort-based aggregation (alongside incrementing number of sort fallback tasks metric).

In other words, sortBased flag indicates whether TungstenAggregationIterator has switched (fallen back) to sort-based aggregation (from initial hash-based processing mode) while processing input rows.

As long as the underlying UnsafeFixedWidthAggregationMap has got enough memory to hold grouping keys (and externalSorter is not created), TungstenAggregationIterator uses processRow function to process rows.

It is only when externalSorter is created, TungstenAggregationIterator switches to sort-based aggregation.

Initial Aggregation Buffer

initialAggregationBuffer: UnsafeRow

TungstenAggregationIterator initializes initialAggregationBuffer (as a new UnsafeRow) when created.

initialAggregationBuffer is used as the emptyAggregationBuffer of the UnsafeFixedWidthAggregationMap.

When requested for next row in sortBased aggregation, TungstenAggregationIterator copies the initialAggregationBuffer to the sortBasedAggregationBuffer.

When requested to outputForEmptyGroupingKeyWithoutInput with no groupingExpressions, TungstenAggregationIterator copies the initialAggregationBuffer to the sortBasedAggregationBuffer.

Sort-Based Aggregation Buffer

sortBasedAggregationBuffer: UnsafeRow

TungstenAggregationIterator initializes sortBasedAggregationBuffer to be a new UnsafeRow when created.

sortBasedAggregationBuffer is used when TungstenAggregationIterator is requested for next row in sort-based processing mode.

sortBasedAggregationBuffer is copied from the initialAggregationBuffer when requested for:

createNewAggregationBuffer

createNewAggregationBuffer(): UnsafeRow

createNewAggregationBuffer creates an UnsafeRow.


createNewAggregationBuffer...FIXME


createNewAggregationBuffer is used when:

Processing Input Rows

processInputs(
  fallbackStartsAt: (Int, Int)): Unit

Procedure

processInputs returns Unit (nothing) and whatever happens inside stays inside (just like in Las Vegas, doesn't it?! 😉)

processInputs is used when:

  • TungstenAggregationIterator is created

processInputs branches off based on the grouping expressions, specified or not.

Grouping Expressions Specified

processInputs...FIXME

No Grouping Expressions

With no grouping expressions, processInputs generates one single grouping key (an UnsafeRow) for all the partition rows. processInputs executes (applies) the grouping projection to a null (undefined) row.

processInputs looks up the aggregation buffer (UnsafeRow) in the UnsafeFixedWidthAggregationMap for the generated grouping key.

In the end, for every InternalRow in the inputIter, processInputs processRow one by one (with the same aggregation buffer).

Falling Back to Sort-Based Aggregation

switchToSortBasedAggregation(): Unit

Procedure

switchToSortBasedAggregation returns Unit (nothing) and whatever happens inside stays inside (just like in Las Vegas, doesn't it?! 😉)

switchToSortBasedAggregation prints out the following INFO message to the logs:

falling back to sort based aggregation.

switchToSortBasedAggregation initializes the sortBasedProcessRow to be generateProcessRow with the newExpressions, newFunctions, newInputAttributes:

switchToSortBasedAggregation initializes the sortedKVIterator to be the KVSorterIterator of this UnsafeKVExternalSorter.

switchToSortBasedAggregation pre-loads the first key-value pair from the sorted iterator (to make hasNext idempotent). switchToSortBasedAggregation requests this UnsafeKVExternalSorter if there is next element and stores the answer in this sortedInputHasNewGroup.

With sortedInputHasNewGroup enabled (true), switchToSortBasedAggregation...FIXME

In the end, switchToSortBasedAggregation turns this sortBased flag on and increments number of sort fallback tasks metric.

mapIteratorHasNext

var mapIteratorHasNext: Boolean = false

mapIteratorHasNext is an internal variable that starts disabled (false) when TungstenAggregationIterator is created.

TungstenAggregationIterator uses mapIteratorHasNext for hash-based aggregation (not sort-based) to indicate whether the aggregationBufferMapIterator has next key-value pair or not when:

mapIteratorHasNext is used to pre-load next key-value pair form aggregationBufferMapIterator to make hasNext idempotent.

mapIteratorHasNext is also used to control whether to free up the memory associated with the UnsafeFixedWidthAggregationMap while in hash-based processing mode.

Output Row for Empty Partition and No Grouping Keys

outputForEmptyGroupingKeyWithoutInput(): UnsafeRow

outputForEmptyGroupingKeyWithoutInput requests the sortBasedAggregationBuffer to copy the bytes off from the initialAggregationBuffer.

outputForEmptyGroupingKeyWithoutInput generates the result using the generateOutput function with an empty UnsafeRow and the sortBasedAggregationBuffer.

In the end, outputForEmptyGroupingKeyWithoutInput frees up the memory associated with this UnsafeFixedWidthAggregationMap.

groupingExpressions Should Be Empty or IllegalStateException

outputForEmptyGroupingKeyWithoutInput throws an IllegalStateException when executed with the groupingExpressions specified:

This method should not be called when groupingExpressions is not empty.

That cannot really happen since HashAggregateExec physical operator makes sure to execute outputForEmptyGroupingKeyWithoutInput when there are no groupingExpressions.


outputForEmptyGroupingKeyWithoutInput is used when:

  • HashAggregateExec physical operator is requested to execute (with an empty partition and no grouping expressions)

UnsafeKVExternalSorter

var externalSorter: UnsafeKVExternalSorter = null

externalSorter is an UnsafeKVExternalSorter.

externalSorter is uninitialized (null) when TungstenAggregationIterator is created.

TungstenAggregationIterator initializes the externalSorter to be the UnsafeKVExternalSorter (of UnsafeFixedWidthAggregationMap) when the UnsafeFixedWidthAggregationMap found no buffer (null) for a grouping key when getAggregationBufferFromUnsafeRow (likely due to running out of memory) while processInputs.

TungstenAggregationIterator switchToSortBasedAggregation right after processing the whole partition that ended up with an UnsafeKVExternalSorter.

While switchToSortBasedAggregation, TungstenAggregationIterator requests the UnsafeKVExternalSorter for a KVSorterIterator (sortedKVIterator).

Peak memory consumption can be monitored using peakMemory performance metric.

Generating Result Projection

AggregationIterator
generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow

generateResultProjection is part of the AggregationIterator abstraction.

generateResultProjection uses an UnsafeRowJoiner for a fast(er) path (than projection) for partial aggregation (when the aggregateExpressions have aggregation modes that are neither Final nor Complete aggregation mode).

Demo

val q = spark.range(10).
  groupBy('id % 2 as "group").
  agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02    +- Range (0, 10, step=1, splits=8)

import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute

// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
  import org.apache.spark.rdd.RDD
  def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
    import org.apache.spark.rdd.MapPartitionsRDD
    hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
  }
}
// END :paste -raw

import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)

import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?

Logging

Enable ALL logging level for org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.TungstenAggregationIterator.name = org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
logger.TungstenAggregationIterator.level = all

Refer to Logging.