Skip to content

TungstenAggregationIterator

TungstenAggregationIterator is an AggregationIterator for HashAggregateExec physical operator.

TungstenAggregationIterator prefers hash-based aggregation before switching to sort-based one.

Creating Instance

TungstenAggregationIterator takes the following to be created:

TungstenAggregationIterator is created when:

  • HashAggregateExec physical operator is executed

TungstenAggregationIterator starts processing input rows and pre-loads the first key-value pair from the UnsafeFixedWidthAggregationMap unless switched to a sort-based aggregation.

Performance Metrics

When created, TungstenAggregationIterator gets SQLMetrics from the HashAggregateExec aggregate physical operator being executed.

The metrics are displayed as part of HashAggregateExec aggregate physical operator (e.g. in web UI in Details for Query).

HashAggregateExec in web UI (Details for Query)

Next UnsafeRow

next(): UnsafeRow

next is part of the Iterator (Scala) abstraction.


next...FIXME

processCurrentSortedGroup

processCurrentSortedGroup(): Unit

processCurrentSortedGroup...FIXME

UnsafeFixedWidthAggregationMap

When created, TungstenAggregationIterator creates an UnsafeFixedWidthAggregationMap with the following:

Used when:

TaskCompletionListener

TungstenAggregationIterator registers a TaskCompletionListener that is executed on task completion (for every task that processes a partition).

When executed (once per partition), the TaskCompletionListener updates the following metrics:

outputForEmptyGroupingKeyWithoutInput

outputForEmptyGroupingKeyWithoutInput(): UnsafeRow

outputForEmptyGroupingKeyWithoutInput...FIXME


outputForEmptyGroupingKeyWithoutInput is used when:

  • HashAggregateExec physical operator is requested to execute (with no input rows and grouping expressions)

Processing Input Rows

processInputs(
  fallbackStartsAt: (Int, Int)): Unit

processInputs...FIXME


processInputs is used when:

  • TungstenAggregationIterator is created

Hash- vs Sort-Based Aggregations

sortBased: Boolean = false

TungstenAggregationIterator creates and initializes sortBased flag to false when created.

The flag is used to indicate whether TungstenAggregationIterator has switched (fall back) to sort-based aggregation while processing input rows.

sortBased flag is turned on (true) while switching to sort-based aggregation (and the numTasksFallBacked metric is incremented).

Switching from hash-based to sort-based aggregation happens when the external sorter is initialized (that is used for sort-based aggregation).

Demo

val q = spark.range(10).
  groupBy('id % 2 as "group").
  agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02    +- Range (0, 10, step=1, splits=8)

import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute

// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
  import org.apache.spark.rdd.RDD
  def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
    import org.apache.spark.rdd.MapPartitionsRDD
    hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
  }
}
// END :paste -raw

import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)

import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?