Skip to content

ObjectAggregationIterator

ObjectAggregationIterator is an AggregationIterator for ObjectHashAggregateExec physical operator.

Creating Instance

ObjectAggregationIterator takes the following to be created:

While being created, ObjectAggregationIterator starts processing input rows.

ObjectAggregationIterator is created when:

  • ObjectHashAggregateExec physical operator is requested to doExecute

outputForEmptyGroupingKeyWithoutInput

outputForEmptyGroupingKeyWithoutInput(): UnsafeRow

outputForEmptyGroupingKeyWithoutInput...FIXME


outputForEmptyGroupingKeyWithoutInput is used when:

Processing Input Rows

processInputs(): Unit

processInputs creates an ObjectAggregationMap.

For no groupingExpressions, processInputs uses the groupingProjection to generate a grouping key (for null row) and finds the aggregation buffer that is used to process all input rows (of a partition).

Otherwise, processInputs uses the sortBased flag to determine whether to use the ObjectAggregationMap or switch to a SortBasedAggregator.

processInputs uses the groupingProjection to generate a grouping key for an input row and finds the aggregation buffer that is used to process the row (of a partition). processInputs continues processing input rows until there are no more rows available or the size of the ObjectAggregationMap reaches spark.sql.objectHashAggregate.sortBased.fallbackThreshold.

When the size of the ObjectAggregationMap reaches spark.sql.objectHashAggregate.sortBased.fallbackThreshold and there are still input rows in the partition, processInputs prints out the following INFO message to the logs, turns the sortBased flag on and increments the numTasksFallBacked metric.

Aggregation hash map size [size] reaches threshold capacity ([fallbackCountThreshold] entries),
spilling and falling back to sort based aggregation.
You may change the threshold by adjusting the option spark.sql.objectHashAggregate.sortBased.fallbackThreshold

For sort-based aggregation (the sortBased flag is enabled), processInputs requests the ObjectAggregationMap to dumpToExternalSorter and create a KVSorterIterator. processInputs creates a SortBasedAggregator, uses the groupingProjection to generate a grouping key for every input row and adds them to the SortBasedAggregator.

In the end, processInputs creates the aggBufferIterator (from the ObjectAggregationMap or SortBasedAggregator based on the sortBased flag).


processInputs is used when:

  • ObjectAggregationIterator is created

Logging

Enable ALL logging level for org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator logger to see what happens inside.

Add the following line to conf/log4j2.properties:

log4j.logger.org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator=ALL

Refer to Logging.