ObjectAggregationIterator¶
ObjectAggregationIterator is an AggregationIterator for ObjectHashAggregateExec physical operator.
Creating Instance¶
ObjectAggregationIterator takes the following to be created:
- Partition ID
- Output Attributes (unused)
- Grouping NamedExpressions
- AggregateExpressions
- Aggregate Attributes
- Initial input buffer offset
- Result NamedExpressions
- Function to create a new
MutableProjectiongiven expressions and attributes ((Seq[Expression], Seq[Attribute]) => MutableProjection) - Original Input Attributes
- Input InternalRows
- spark.sql.objectHashAggregate.sortBased.fallbackThreshold
- numOutputRows metric
- spillSize metric
- numTasksFallBacked metric
While being created, ObjectAggregationIterator starts processing input rows.
ObjectAggregationIterator is created when:
ObjectHashAggregateExecphysical operator is requested to doExecute
outputForEmptyGroupingKeyWithoutInput¶
outputForEmptyGroupingKeyWithoutInput(): UnsafeRow
outputForEmptyGroupingKeyWithoutInput...FIXME
outputForEmptyGroupingKeyWithoutInput is used when:
ObjectHashAggregateExecphysical operator is executed (with no input rows and no groupingExpressions)
Processing Input Rows¶
processInputs(): Unit
processInputs creates an ObjectAggregationMap.
For no groupingExpressions, processInputs uses the groupingProjection to generate a grouping key (for null row) and finds the aggregation buffer that is used to process all input rows (of a partition).
Otherwise, processInputs uses the sortBased flag to determine whether to use the ObjectAggregationMap or switch to a SortBasedAggregator.
processInputs uses the groupingProjection to generate a grouping key for an input row and finds the aggregation buffer that is used to process the row (of a partition). processInputs continues processing input rows until there are no more rows available or the size of the ObjectAggregationMap reaches spark.sql.objectHashAggregate.sortBased.fallbackThreshold.
When the size of the ObjectAggregationMap reaches spark.sql.objectHashAggregate.sortBased.fallbackThreshold and there are still input rows in the partition, processInputs prints out the following INFO message to the logs, turns the sortBased flag on and increments the numTasksFallBacked metric.
Aggregation hash map size [size] reaches threshold capacity ([fallbackCountThreshold] entries),
spilling and falling back to sort based aggregation.
You may change the threshold by adjusting the option spark.sql.objectHashAggregate.sortBased.fallbackThreshold
For sort-based aggregation (the sortBased flag is enabled), processInputs requests the ObjectAggregationMap to dumpToExternalSorter and create a KVSorterIterator. processInputs creates a SortBasedAggregator, uses the groupingProjection to generate a grouping key for every input row and adds them to the SortBasedAggregator.
In the end, processInputs creates the aggBufferIterator (from the ObjectAggregationMap or SortBasedAggregator based on the sortBased flag).
processInputs is used when:
ObjectAggregationIteratoris created
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator logger to see what happens inside.
Add the following line to conf/log4j2.properties:
log4j.logger.org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator=ALL
Refer to Logging.