Skip to content


TungstenAggregationIterator is an AggregationIterator for HashAggregateExec physical operator.

TungstenAggregationIterator prefers hash-based aggregation (before switching to a sort-based aggregation).

Creating Instance

TungstenAggregationIterator takes the following to be created:

TungstenAggregationIterator is created when:

  • HashAggregateExec physical operator is requested to doExecute


The SQL metrics (numOutputRows, peakMemory, spillSize and avgHashProbe) belong to the HashAggregateExec physical operator that created the TungstenAggregationIterator.

TungstenAggregationIterator starts processing input rows and pre-loads the first key-value pair from the UnsafeFixedWidthAggregationMap if did not switch to a sort-based aggregation.

Performance Metrics

When created, TungstenAggregationIterator gets SQLMetrics from the HashAggregateExec aggregate physical operator being executed.

The metrics are displayed as part of HashAggregateExec aggregate physical operator (e.g. in web UI in Details for Query).

HashAggregateExec in web UI (Details for Query)

Next UnsafeRow

next(): UnsafeRow

next is part of the Iterator (Scala) abstraction.



processCurrentSortedGroup(): Unit




Used when:


TungstenAggregationIterator registers a TaskCompletionListener that is executed on task completion (for every task that processes a partition).

When executed (once per partition), the TaskCompletionListener updates the following metrics:


outputForEmptyGroupingKeyWithoutInput(): UnsafeRow


outputForEmptyGroupingKeyWithoutInput is used when:

  • HashAggregateExec physical operator is requested to execute (with no input rows and grouping expressions)


val q = spark.range(10).
  groupBy('id % 2 as "group").
  agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02    +- Range (0, 10, step=1, splits=8)

import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute

// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
  import org.apache.spark.rdd.RDD
  def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
    import org.apache.spark.rdd.MapPartitionsRDD
    hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
// END :paste -raw

import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)

import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?