ObjectAggregationIterator¶
ObjectAggregationIterator
is an AggregationIterator for ObjectHashAggregateExec physical operator.
Creating Instance¶
ObjectAggregationIterator
takes the following to be created:
- Partition ID
- Output Attributes (unused)
- Grouping NamedExpressions
- AggregateExpressions
- Aggregate Attributes
- Initial input buffer offset
- Result NamedExpressions
- Function to create a new
MutableProjection
given expressions and attributes ((Seq[Expression], Seq[Attribute]) => MutableProjection
) - Original Input Attributes
- Input InternalRows
- spark.sql.objectHashAggregate.sortBased.fallbackThreshold
- numOutputRows metric
- spillSize metric
- numTasksFallBacked metric
While being created, ObjectAggregationIterator
starts processing input rows.
ObjectAggregationIterator
is created when:
ObjectHashAggregateExec
physical operator is requested to doExecute
outputForEmptyGroupingKeyWithoutInput¶
outputForEmptyGroupingKeyWithoutInput(): UnsafeRow
outputForEmptyGroupingKeyWithoutInput
...FIXME
outputForEmptyGroupingKeyWithoutInput
is used when:
ObjectHashAggregateExec
physical operator is executed (with no input rows and no groupingExpressions)
Processing Input Rows¶
processInputs(): Unit
processInputs
creates an ObjectAggregationMap.
For no groupingExpressions, processInputs
uses the groupingProjection to generate a grouping key (for null
row) and finds the aggregation buffer that is used to process all input rows (of a partition).
Otherwise, processInputs
uses the sortBased flag to determine whether to use the ObjectAggregationMap
or switch to a SortBasedAggregator
.
processInputs
uses the groupingProjection to generate a grouping key for an input row and finds the aggregation buffer that is used to process the row (of a partition). processInputs
continues processing input rows until there are no more rows available or the size of the ObjectAggregationMap
reaches spark.sql.objectHashAggregate.sortBased.fallbackThreshold.
When the size of the ObjectAggregationMap
reaches spark.sql.objectHashAggregate.sortBased.fallbackThreshold and there are still input rows in the partition, processInputs
prints out the following INFO message to the logs, turns the sortBased flag on and increments the numTasksFallBacked metric.
Aggregation hash map size [size] reaches threshold capacity ([fallbackCountThreshold] entries),
spilling and falling back to sort based aggregation.
You may change the threshold by adjusting the option spark.sql.objectHashAggregate.sortBased.fallbackThreshold
For sort-based aggregation (the sortBased flag is enabled), processInputs
requests the ObjectAggregationMap
to dumpToExternalSorter and create a KVSorterIterator
. processInputs
creates a SortBasedAggregator
, uses the groupingProjection to generate a grouping key for every input row and adds them to the SortBasedAggregator
.
In the end, processInputs
creates the aggBufferIterator (from the ObjectAggregationMap
or SortBasedAggregator
based on the sortBased flag).
processInputs
is used when:
ObjectAggregationIterator
is created
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator=ALL
Refer to Logging.