AggregationIterator¶
AggregationIterator
is an abstraction of aggregation iterators (of UnsafeRows) that are used by aggregate physical operators to process rows in a partition.
abstract class AggregationIterator(...)
extends Iterator[UnsafeRow]
From scala.collection.Iterator:
Iterators are data structures that allow to iterate over a sequence of elements. They have a
hasNext
method for checking if there is a next element available, and anext
method which returns the next element and discards it from the iterator.
Implementations¶
Creating Instance¶
AggregationIterator
takes the following to be created:
- Partition ID
- Grouping NamedExpressions
- Input Attributes
- AggregateExpressions
- Aggregate Attributes
- Initial input buffer offset
- Result NamedExpressions
- Function to create a new
MutableProjection
given expressions and attributes ((Seq[Expression], Seq[Attribute]) => MutableProjection
)
Abstract Class
AggregationIterator
is an abstract class and cannot be created directly. It is created indirectly for the concrete AggregationIterators.
AggregateModes¶
When created, AggregationIterator
makes sure that there are at most 2 distinct AggregateMode
s of the AggregateExpressions.
The AggregateMode
s have to be a subset of the following mode pairs:
Partial
andPartialMerge
Final
andComplete
Process Row Function¶
processRow: (InternalRow, InternalRow) => Unit
AggregationIterator
generates a processRow
function when created.
processRow
is a procedure
processRow
is a procedure that takes two InternalRows and produces no output (returns Unit
).
processRow
is similar to the following definition:
def processRow(currentBuffer: InternalRow, row: InternalRow): Unit = {
...
}
AggregationIterator
uses the aggregateExpressions, the aggregateFunctions and the inputAttributes to generate the processRow procedure.
processRow
is used when:
MergingSessionsIterator
is requested toprocessCurrentSortedGroup
ObjectAggregationIterator
is requested to process input rowsSortBasedAggregationIterator
is requested to processCurrentSortedGroupTungstenAggregationIterator
is requested to process input rows
AggregateFunctions¶
aggregateFunctions: Array[AggregateFunction]
When created, AggregationIterator
initializes AggregateFunctions in the aggregateExpressions (with initialInputBufferOffset).
initializeAggregateFunctions¶
initializeAggregateFunctions(
expressions: Seq[AggregateExpression],
startingInputBufferOffset: Int): Array[AggregateFunction]
initializeAggregateFunctions
...FIXME
initializeAggregateFunctions
is used when:
AggregationIterator
is requested for the aggregateFunctionsObjectAggregationIterator
is requested for the mergeAggregationBuffersTungstenAggregationIterator
is requested to switchToSortBasedAggregation
Generate Output Function¶
generateOutput: (UnsafeRow, InternalRow) => UnsafeRow
AggregationIterator
creates a ResultProjection function when created.
generateOutput
is used by the aggregate iterators when they are requested for the next element (aggregate result) and generate an output for empty grouping with no input.
Aggregate Iterators | Operations |
---|---|
ObjectAggregationIterator | |
SortBasedAggregationIterator | |
TungstenAggregationIterator |
Generating Result Projection¶
generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow
TungstenAggregationIterator
TungstenAggregationIterator overrides generateResultProjection for partial aggregation (non-Final
and non-Complete
aggregate modes).
generateResultProjection
branches off based on the aggregate modes of the aggregates:
Main Differences between Aggregate Modes
Final and Complete | Partial and PartialMerge |
---|---|
Focus on DeclarativeAggregates to execute the evaluateExpressions (while the allImperativeAggregateFunctions simply eval) | Focus on TypedImperativeAggregates so they can serializeAggregateBufferInPlace |
An UnsafeProjection binds the resultExpressions to the following: | An UnsafeProjection binds the groupingAttributes and bufferAttributes to the following (repeated twice rightly): |
Uses an UnsafeProjection to generate an UnsafeRow for the following:
| Uses an UnsafeProjection to generate an UnsafeRow for the following:
|
Final and Complete¶
For Final or Complete modes, generateResultProjection
does the following:
- Collects expressions to evaluate the final values of the DeclarativeAggregates and
NoOp
s for the AggregateFunctions among the aggregateFunctions.generateResultProjection
preserves the order of the evaluate expressions andNoOp
s (so thei
th aggregate function uses thei
th evaluation expressions) - Executes the newMutableProjection with the evaluation expressions and the aggBufferAttributes of the aggregateFunctions to create a MutableProjection
- Requests the
MutableProjection
to store the aggregate results (of all the DeclarativeAggregates) in aSpecificInternalRow
- Creates an UnsafeProjection for the resultExpressions and the groupingAttributes with the aggregateAttributes (for the input schema)
- Initializes the
UnsafeProjection
with the partIndex
In the end, generateResultProjection
creates a result projection function that does the following:
- Generates results for all expression-based aggregate functions (using the
MutableProjection
with the givencurrentBuffer
) - Generates results for all imperative aggregate functions
- Uses the
UnsafeProjection
to generate an UnsafeRow with the aggregate results for the current grouping key and the aggregate results
Partial and PartialMerge¶
For Partial or PartialMerge modes, generateResultProjection
does the following:
- Creates an UnsafeProjection for the groupingAttributes with the aggBufferAttributes of the aggregateFunctions
- Initializes the
UnsafeProjection
with the partIndex - Collects the TypedImperativeAggregates from the aggregateFunctions (as they store a generic object in an aggregation buffer, and require calling serialization before shuffling)
In the end, generateResultProjection
creates a result projection function that does the following:
- Requests the TypedImperativeAggregates (from the aggregateFunctions) to serializeAggregateBufferInPlace with the given
currentBuffer
- Uses the
UnsafeProjection
to generate an UnsafeRow with the current grouping key and buffer
No Modes¶
For no aggregate modes, generateResultProjection
...FIXME
Initializing Aggregation Buffer¶
initializeBuffer(
buffer: InternalRow): Unit
initializeBuffer
requests the expressionAggInitialProjection to store an execution result of an empty row in the given InternalRow (buffer
).
initializeBuffer
requests all the ImperativeAggregate functions to initialize with the buffer
internal row.
initializeBuffer
is used when:
MergingSessionsIterator
is requested tonewBuffer
,initialize
,next
,outputForEmptyGroupingKeyWithoutInput
SortBasedAggregationIterator
is requested to newBuffer, initialize, next and outputForEmptyGroupingKeyWithoutInput
Generating Process Row Function¶
generateProcessRow(
expressions: Seq[AggregateExpression],
functions: Seq[AggregateFunction],
inputAttributes: Seq[Attribute]): (InternalRow, InternalRow) => Unit
generateProcessRow
creates a mutable JoinedRow
(of two InternalRows).
generateProcessRow
branches off based on the given AggregateExpressions, specified or not.
Where AggregateExpressions come from
Caller | AggregateExpressions |
---|---|
AggregationIterator | aggregateExpressions |
ObjectAggregationIterator | aggregateExpressions |
TungstenAggregationIterator | aggregateExpressions |
functions
Argument
generateProcessRow
works differently based on the type of the given AggregateFunctions:
generateProcessRow
is used when:
AggregationIterator
is requested for the process row functionObjectAggregationIterator
is requested for the mergeAggregationBuffers functionTungstenAggregationIterator
is requested to switch to sort-based aggregation
Aggregate Expressions Specified¶
Merge Expressions¶
With AggregateExpressions specified, generateProcessRow
determines so-called "merge expressions" (mergeExpressions
) as follows:
-
For DeclarativeAggregate functions, the merge expressions are choosen based on the AggregateMode of the corresponding AggregateExpression
AggregateMode Merge Expressions Partial
orComplete
Update Expressions of a DeclarativeAggregate
PartialMerge
orFinal
Merge Expressions of a DeclarativeAggregate
-
For AggregateFunction functions, there are as many
NoOp
merge expressions (that do nothing and do not change a value) as there are aggBufferAttributes in aAggregateFunction
Initialize Predicates¶
generateProcessRow
finds AggregateExpressions with filters specified.
When in Partial
or Complete
aggregate modes, generateProcessRow
...FIXME
Update Functions¶
generateProcessRow
determines so-called "update functions" (updateFunctions
) among ImperativeAggregate functions (in the given AggregateFunctions) to be as follows:
- FIXME
Update Projection¶
generateProcessRow
uses the newMutableProjection generator function to create a MutableProjection based on the mergeExpressions
and the aggBufferAttributes of the given AggregateFunctions with the given inputAttributes
.
Process Row Function¶
In the end, generateProcessRow
creates a procedure that accepts two InternalRows (currentBuffer
and row
) that does the following:
- Processes all expression-based aggregate functions (using
updateProjection
).generateProcessRow
requests the MutableProjection to store the output in thecurrentBuffer
. The output is created based on thecurrentBuffer
and therow
. - Processes all imperative aggregate functions.
generateProcessRow
requests every "update function" (inupdateFunctions
) to execute with the givencurrentBuffer
and therow
.
No Aggregate Expressions¶
With no AggregateExpressions (expressions
), generateProcessRow
creates a function that does nothing ("swallows" the input).