SortAggregateExec Unary Physical Operator¶
SortAggregateExec
is an AggregateCodegenSupport physical operator for Sort-Based Aggregation that uses SortBasedAggregationIterator (to iterate over UnsafeRows in partitions) when executed.
SortAggregateExec
is an OrderPreservingUnaryExecNode.
Creating Instance¶
SortAggregateExec
takes the following to be created:
- Required Child Distribution
-
isStreaming
flag - Number of Shuffle Partitions
- Grouping Keys (NamedExpressions)
- Aggregates (AggregateExpressions)
- Aggregate Attributes
- Initial Input Buffer Offset
- Result (NamedExpressions)
- Child Physical Operator
SortAggregateExec
is created when:
AggUtils
utility is used to create a physical operator for aggregationBaseAggregateExec
is requested to toSortAggregate
Performance Metrics¶
number of output rows¶
Whole-Stage Code Generation¶
As an AggregateCodegenSupport physical operator, SortAggregateExec
supports Whole-Stage Code Generation only when supportCodegen flag is enabled.
Executing Physical Operator¶
doExecute
requests the child physical operator to execute (that triggers physical query planning and generates an RDD[InternalRow]
).
doExecute
transforms the RDD[InternalRow]
with the transformation f
function for every partition (using RDD.mapPartitionsWithIndex transformation).
Note
This is exactly as in HashAggregateExec.
Transformation Function¶
doExecute
uses RDD.mapPartitionsWithIndex to process partition InternalRows (with a partition ID).
// (partition ID, partition rows)
f: (Int, Iterator[T]) => Iterator[U]
For every partition, mapPartitionsWithIndex
...FIXME
Note
This is exactly as in HashAggregateExec except that SortAggregateExec
uses SortBasedAggregationIterator.
supportCodegen¶
AggregateCodegenSupport
supportCodegen: Boolean
supportCodegen
is part of the AggregateCodegenSupport abstraction.
supportCodegen
is enabled (true
) when all the following hold:
- The parent supportCodegen is enabled
- spark.sql.codegen.aggregate.sortAggregate.enabled is enabled
- No grouping keys
Required Child Ordering¶
SparkPlan
requiredChildOrdering: Seq[Seq[SortOrder]]
requiredChildOrdering
is part of the SparkPlan abstraction.
requiredChildOrdering
is SortOrders in Ascending
direction for every grouping key.
Ordering Expressions¶
AliasAwareQueryOutputOrdering
orderingExpressions: Seq[SortOrder]
orderingExpressions
is part of the AliasAwareQueryOutputOrdering abstraction.
orderingExpressions
is SortOrders in Ascending
direction for every grouping key.
Demo¶
Let's disable preference for ObjectHashAggregateExec physical operator (using the spark.sql.execution.useObjectHashAggregateExec configuration property).
spark.conf.set("spark.sql.execution.useObjectHashAggregateExec", false)
assert(spark.sessionState.conf.useObjectHashAggregation == false)
val names = Seq(
(0, "zero"),
(1, "one"),
(2, "two")).toDF("num", "name")
Let's use immutable data types for aggregateBufferAttributes
(so HashAggregateExec physical operator will not be selected).
val q = names
.withColumn("initial", substring('name, 0, 1))
.groupBy('initial)
.agg(collect_set('initial))
q.explain
== Physical Plan ==
SortAggregate(key=[initial#160], functions=[collect_set(initial#160, 0, 0)])
+- *(2) Sort [initial#160 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(initial#160, 200), ENSURE_REQUIREMENTS, [id=#122]
+- SortAggregate(key=[initial#160], functions=[partial_collect_set(initial#160, 0, 0)])
+- *(1) Sort [initial#160 ASC NULLS FIRST], false, 0
+- *(1) LocalTableScan [initial#160]
import q.queryExecution.optimizedPlan
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
val aggLog = optimizedPlan.asInstanceOf[Aggregate]
import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
val (_, aggregateExpressions: Seq[AggregateExpression], _, _) = PhysicalAggregation.unapply(aggLog).get
val aggregateBufferAttributes =
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
assert(HashAggregateExec.supportsAggregate(aggregateBufferAttributes) == false)