Skip to content

AggregationIterator

AggregationIterator is an abstraction of aggregation iterators (of UnsafeRows) that are used by aggregate physical operators to process rows in a partition.

abstract class AggregationIterator(...)
extends Iterator[UnsafeRow]

From scala.collection.Iterator:

Iterators are data structures that allow to iterate over a sequence of elements. They have a hasNext method for checking if there is a next element available, and a next method which returns the next element and discards it from the iterator.

Implementations

Creating Instance

AggregationIterator takes the following to be created:

Abstract Class

AggregationIterator is an abstract class and cannot be created directly. It is created indirectly for the concrete AggregationIterators.

AggregateModes

When created, AggregationIterator makes sure that there are at most 2 distinct AggregateModes of the AggregateExpressions.

The AggregateModes have to be a subset of the following mode pairs:

  • Partial and PartialMerge
  • Final and Complete

Process Row Function

processRow: (InternalRow, InternalRow) => Unit

AggregationIterator generates a processRow function when created.

processRow is a procedure

processRow is a procedure that takes two InternalRows and produces no output (returns Unit).

processRow is similar to the following definition:

def processRow(currentBuffer: InternalRow, row: InternalRow): Unit = {
  ...
}

AggregationIterator uses the aggregateExpressions, the aggregateFunctions and the inputAttributes to generate the processRow procedure.


processRow is used when:

AggregateFunctions

aggregateFunctions: Array[AggregateFunction]

When created, AggregationIterator initializes AggregateFunctions in the aggregateExpressions (with initialInputBufferOffset).

initializeAggregateFunctions

initializeAggregateFunctions(
  expressions: Seq[AggregateExpression],
  startingInputBufferOffset: Int): Array[AggregateFunction]

initializeAggregateFunctions...FIXME

initializeAggregateFunctions is used when:

Generate Output Function

generateOutput: (UnsafeRow, InternalRow) => UnsafeRow

AggregationIterator creates a ResultProjection function when created.

generateOutput is used by the aggregate iterators when they are requested for the next element (aggregate result) and generate an output for empty grouping with no input.

Aggregate Iterators Operations
ObjectAggregationIterator
SortBasedAggregationIterator
TungstenAggregationIterator

Generating Result Projection

generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow
TungstenAggregationIterator

TungstenAggregationIterator overrides generateResultProjection for partial aggregation (non-Final and non-Complete aggregate modes).

generateResultProjection branches off based on the aggregate modes of the aggregates:

  1. Final and Complete
  2. Partial and PartialMerge
  3. No modes

Main Differences between Aggregate Modes

Final and Complete Partial and PartialMerge
Focus on DeclarativeAggregates to execute the evaluateExpressions (while the allImperativeAggregateFunctions simply eval) Focus on TypedImperativeAggregates so they can serializeAggregateBufferInPlace
An UnsafeProjection binds the resultExpressions to the following:
  1. groupingAttributes
  2. the aggregateAttributes
An UnsafeProjection binds the groupingAttributes and bufferAttributes to the following (repeated twice rightly):
  1. the groupingAttributes
  2. the bufferAttributes
Uses an UnsafeProjection to generate an UnsafeRow for the following:
  1. the current grouping key
  2. the aggregate results
Uses an UnsafeProjection to generate an UnsafeRow for the following:
  1. the current grouping key
  2. the current buffer

Final and Complete

For Final or Complete modes, generateResultProjection does the following:

  1. Collects expressions to evaluate the final values of the DeclarativeAggregates and NoOps for the AggregateFunctions among the aggregateFunctions. generateResultProjection preserves the order of the evaluate expressions and NoOps (so the ith aggregate function uses the ith evaluation expressions)
  2. Executes the newMutableProjection with the evaluation expressions and the aggBufferAttributes of the aggregateFunctions to create a MutableProjection
  3. Requests the MutableProjection to store the aggregate results (of all the DeclarativeAggregates) in a SpecificInternalRow
  4. Creates an UnsafeProjection for the resultExpressions and the groupingAttributes with the aggregateAttributes (for the input schema)
  5. Initializes the UnsafeProjection with the partIndex

In the end, generateResultProjection creates a result projection function that does the following:

  1. Generates results for all expression-based aggregate functions (using the MutableProjection with the given currentBuffer)
  2. Generates results for all imperative aggregate functions
  3. Uses the UnsafeProjection to generate an UnsafeRow with the aggregate results for the current grouping key and the aggregate results

Partial and PartialMerge

For Partial or PartialMerge modes, generateResultProjection does the following:

  1. Creates an UnsafeProjection for the groupingAttributes with the aggBufferAttributes of the aggregateFunctions
  2. Initializes the UnsafeProjection with the partIndex
  3. Collects the TypedImperativeAggregates from the aggregateFunctions (as they store a generic object in an aggregation buffer, and require calling serialization before shuffling)

In the end, generateResultProjection creates a result projection function that does the following:

  1. Requests the TypedImperativeAggregates (from the aggregateFunctions) to serializeAggregateBufferInPlace with the given currentBuffer
  2. Uses the UnsafeProjection to generate an UnsafeRow with the current grouping key and buffer

No Modes

For no aggregate modes, generateResultProjection...FIXME

Initializing Aggregation Buffer

initializeBuffer(
  buffer: InternalRow): Unit

initializeBuffer requests the expressionAggInitialProjection to store an execution result of an empty row in the given InternalRow (buffer).

initializeBuffer requests all the ImperativeAggregate functions to initialize with the buffer internal row.


initializeBuffer is used when:

Generating Process Row Function

generateProcessRow(
  expressions: Seq[AggregateExpression],
  functions: Seq[AggregateFunction],
  inputAttributes: Seq[Attribute]): (InternalRow, InternalRow) => Unit

generateProcessRow creates a mutable JoinedRow (of two InternalRows).

generateProcessRow branches off based on the given AggregateExpressions, specified or not.

Where AggregateExpressions come from
Caller AggregateExpressions
AggregationIterator aggregateExpressions
ObjectAggregationIterator aggregateExpressions
TungstenAggregationIterator aggregateExpressions

functions Argument

generateProcessRow works differently based on the type of the given AggregateFunctions:


generateProcessRow is used when:

Aggregate Expressions Specified

Merge Expressions

With AggregateExpressions specified, generateProcessRow determines so-called "merge expressions" (mergeExpressions) as follows:

Initialize Predicates

generateProcessRow finds AggregateExpressions with filters specified.

When in Partial or Complete aggregate modes, generateProcessRow...FIXME

Update Functions

generateProcessRow determines so-called "update functions" (updateFunctions) among ImperativeAggregate functions (in the given AggregateFunctions) to be as follows:

  • FIXME

Update Projection

generateProcessRow uses the newMutableProjection generator function to create a MutableProjection based on the mergeExpressions and the aggBufferAttributes of the given AggregateFunctions with the given inputAttributes.

Process Row Function

In the end, generateProcessRow creates a procedure that accepts two InternalRows (currentBuffer and row) that does the following:

  1. Processes all expression-based aggregate functions (using updateProjection).generateProcessRow requests the MutableProjection to store the output in the currentBuffer. The output is created based on the currentBuffer and the row.
  2. Processes all imperative aggregate functions. generateProcessRow requests every "update function" (in updateFunctions) to execute with the given currentBuffer and the row.

No Aggregate Expressions

With no AggregateExpressions (expressions), generateProcessRow creates a function that does nothing ("swallows" the input).