AggregationIterator¶
AggregationIterator is an abstraction of aggregation iterators (of UnsafeRows) that are used by aggregate physical operators to process rows in a partition.
abstract class AggregationIterator(...)
extends Iterator[UnsafeRow]
From scala.collection.Iterator:
Iterators are data structures that allow to iterate over a sequence of elements. They have a
hasNextmethod for checking if there is a next element available, and anextmethod which returns the next element and discards it from the iterator.
Implementations¶
Creating Instance¶
AggregationIterator takes the following to be created:
- Partition ID
- Grouping NamedExpressions
- Input Attributes
- AggregateExpressions
- Aggregate Attributes
- Initial input buffer offset
- Result NamedExpressions
- Function to create a new
MutableProjectiongiven expressions and attributes ((Seq[Expression], Seq[Attribute]) => MutableProjection)
Abstract Class
AggregationIterator is an abstract class and cannot be created directly. It is created indirectly for the concrete AggregationIterators.
AggregateModes¶
When created, AggregationIterator makes sure that there are at most 2 distinct AggregateModes of the AggregateExpressions.
The AggregateModes have to be a subset of the following mode pairs:
PartialandPartialMergeFinalandComplete
Process Row Function¶
processRow: (InternalRow, InternalRow) => Unit
AggregationIterator generates a processRow function when created.
processRow is a procedure
processRow is a procedure that takes two InternalRows and produces no output (returns Unit).
processRow is similar to the following definition:
def processRow(currentBuffer: InternalRow, row: InternalRow): Unit = {
...
}
AggregationIterator uses the aggregateExpressions, the aggregateFunctions and the inputAttributes to generate the processRow procedure.
processRow is used when:
MergingSessionsIteratoris requested toprocessCurrentSortedGroupObjectAggregationIteratoris requested to process input rowsSortBasedAggregationIteratoris requested to processCurrentSortedGroupTungstenAggregationIteratoris requested to process input rows
AggregateFunctions¶
aggregateFunctions: Array[AggregateFunction]
When created, AggregationIterator initializes AggregateFunctions in the aggregateExpressions (with initialInputBufferOffset).
initializeAggregateFunctions¶
initializeAggregateFunctions(
expressions: Seq[AggregateExpression],
startingInputBufferOffset: Int): Array[AggregateFunction]
initializeAggregateFunctions...FIXME
initializeAggregateFunctions is used when:
AggregationIteratoris requested for the aggregateFunctionsObjectAggregationIteratoris requested for the mergeAggregationBuffersTungstenAggregationIteratoris requested to switchToSortBasedAggregation
Generate Output Function¶
generateOutput: (UnsafeRow, InternalRow) => UnsafeRow
AggregationIterator creates a ResultProjection function when created.
generateOutput is used by the aggregate iterators when they are requested for the next element (aggregate result) and generate an output for empty grouping with no input.
| Aggregate Iterators | Operations |
|---|---|
ObjectAggregationIterator | |
SortBasedAggregationIterator | |
TungstenAggregationIterator |
Generating Result Projection¶
generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow
TungstenAggregationIterator
TungstenAggregationIterator overrides generateResultProjection for partial aggregation (non-Final and non-Complete aggregate modes).
generateResultProjection branches off based on the aggregate modes of the aggregates:
Main Differences between Aggregate Modes
| Final and Complete | Partial and PartialMerge |
|---|---|
| Focus on DeclarativeAggregates to execute the evaluateExpressions (while the allImperativeAggregateFunctions simply eval) | Focus on TypedImperativeAggregates so they can serializeAggregateBufferInPlace |
| An UnsafeProjection binds the resultExpressions to the following: | An UnsafeProjection binds the groupingAttributes and bufferAttributes to the following (repeated twice rightly): |
Uses an UnsafeProjection to generate an UnsafeRow for the following:
| Uses an UnsafeProjection to generate an UnsafeRow for the following:
|
Final and Complete¶
For Final or Complete modes, generateResultProjection does the following:
- Collects expressions to evaluate the final values of the DeclarativeAggregates and
NoOps for the AggregateFunctions among the aggregateFunctions.generateResultProjectionpreserves the order of the evaluate expressions andNoOps (so theith aggregate function uses theith evaluation expressions) - Executes the newMutableProjection with the evaluation expressions and the aggBufferAttributes of the aggregateFunctions to create a MutableProjection
- Requests the
MutableProjectionto store the aggregate results (of all the DeclarativeAggregates) in aSpecificInternalRow - Creates an UnsafeProjection for the resultExpressions and the groupingAttributes with the aggregateAttributes (for the input schema)
- Initializes the
UnsafeProjectionwith the partIndex
In the end, generateResultProjection creates a result projection function that does the following:
- Generates results for all expression-based aggregate functions (using the
MutableProjectionwith the givencurrentBuffer) - Generates results for all imperative aggregate functions
- Uses the
UnsafeProjectionto generate an UnsafeRow with the aggregate results for the current grouping key and the aggregate results
Partial and PartialMerge¶
For Partial or PartialMerge modes, generateResultProjection does the following:
- Creates an UnsafeProjection for the groupingAttributes with the aggBufferAttributes of the aggregateFunctions
- Initializes the
UnsafeProjectionwith the partIndex - Collects the TypedImperativeAggregates from the aggregateFunctions (as they store a generic object in an aggregation buffer, and require calling serialization before shuffling)
In the end, generateResultProjection creates a result projection function that does the following:
- Requests the TypedImperativeAggregates (from the aggregateFunctions) to serializeAggregateBufferInPlace with the given
currentBuffer - Uses the
UnsafeProjectionto generate an UnsafeRow with the current grouping key and buffer
No Modes¶
For no aggregate modes, generateResultProjection...FIXME
Initializing Aggregation Buffer¶
initializeBuffer(
buffer: InternalRow): Unit
initializeBuffer requests the expressionAggInitialProjection to store an execution result of an empty row in the given InternalRow (buffer).
initializeBuffer requests all the ImperativeAggregate functions to initialize with the buffer internal row.
initializeBuffer is used when:
MergingSessionsIteratoris requested tonewBuffer,initialize,next,outputForEmptyGroupingKeyWithoutInputSortBasedAggregationIteratoris requested to newBuffer, initialize, next and outputForEmptyGroupingKeyWithoutInput
Generating Process Row Function¶
generateProcessRow(
expressions: Seq[AggregateExpression],
functions: Seq[AggregateFunction],
inputAttributes: Seq[Attribute]): (InternalRow, InternalRow) => Unit
generateProcessRow creates a mutable JoinedRow (of two InternalRows).
generateProcessRow branches off based on the given AggregateExpressions, specified or not.
Where AggregateExpressions come from
| Caller | AggregateExpressions |
|---|---|
| AggregationIterator | aggregateExpressions |
| ObjectAggregationIterator | aggregateExpressions |
| TungstenAggregationIterator | aggregateExpressions |
functions Argument
generateProcessRow works differently based on the type of the given AggregateFunctions:
generateProcessRow is used when:
AggregationIteratoris requested for the process row functionObjectAggregationIteratoris requested for the mergeAggregationBuffers functionTungstenAggregationIteratoris requested to switch to sort-based aggregation
Aggregate Expressions Specified¶
Merge Expressions¶
With AggregateExpressions specified, generateProcessRow determines so-called "merge expressions" (mergeExpressions) as follows:
-
For DeclarativeAggregate functions, the merge expressions are choosen based on the AggregateMode of the corresponding AggregateExpression
AggregateMode Merge Expressions PartialorCompleteUpdate Expressions of a DeclarativeAggregatePartialMergeorFinalMerge Expressions of a DeclarativeAggregate -
For AggregateFunction functions, there are as many
NoOpmerge expressions (that do nothing and do not change a value) as there are aggBufferAttributes in aAggregateFunction
Initialize Predicates¶
generateProcessRow finds AggregateExpressions with filters specified.
When in Partial or Complete aggregate modes, generateProcessRow...FIXME
Update Functions¶
generateProcessRow determines so-called "update functions" (updateFunctions) among ImperativeAggregate functions (in the given AggregateFunctions) to be as follows:
- FIXME
Update Projection¶
generateProcessRow uses the newMutableProjection generator function to create a MutableProjection based on the mergeExpressions and the aggBufferAttributes of the given AggregateFunctions with the given inputAttributes.
Process Row Function¶
In the end, generateProcessRow creates a procedure that accepts two InternalRows (currentBuffer and row) that does the following:
- Processes all expression-based aggregate functions (using
updateProjection).generateProcessRowrequests the MutableProjection to store the output in thecurrentBuffer. The output is created based on thecurrentBufferand therow. - Processes all imperative aggregate functions.
generateProcessRowrequests every "update function" (inupdateFunctions) to execute with the givencurrentBufferand therow.
No Aggregate Expressions¶
With no AggregateExpressions (expressions), generateProcessRow creates a function that does nothing ("swallows" the input).