Aggregation Execution Planning Strategy¶
Aggregation
is an execution planning strategy that SparkPlanner uses for planning Aggregate logical operators (in the order of preference):
Executing Rule¶
GenericStrategy
apply(
plan: LogicalPlan): Seq[SparkPlan]
apply
is part of the GenericStrategy abstraction.
apply
plans Aggregate logical operators with the aggregate expressions all being of the same type, either AggregateExpressions or PythonUDFs (with SQL_GROUPED_AGG_PANDAS_UDF
eval type).
AnalysisException
apply
throws an AnalysisException
for mis-placed PythonUDF
s:
Cannot use a mixture of aggregate function and group aggregate pandas UDF
apply
destructures the Aggregate logical operator (into a four-element tuple) with the following:
- Grouping Expressions
- Aggregration Expressions
- Result Expressions
- Child Logical Operator
AggregateExpressions¶
For Aggregate logical operators with AggregateExpressions, apply
splits them based on the isDistinct flag.
Without distinct aggregate functions (expressions), apply
planAggregateWithoutDistinct. Otherwise, apply
planAggregateWithOneDistinct.
In the end, apply
creates one of the following physical operators based on whether there is distinct aggregate function or not.
Note
It is assumed that all the distinct aggregate functions have the same column expressions.
COUNT(DISTINCT foo), MAX(DISTINCT foo)
The following is not valid due to different column expressions
COUNT(DISTINCT bar), COUNT(DISTINCT foo)
PythonUDFs¶
For Aggregate logical operators with PythonUDFs, apply
...FIXME
Demo¶
A structured query with count
aggregate function.
val q = spark
.range(5)
.groupBy($"id" % 2 as "group")
.agg(count("id") as "count")
val plan = q.queryExecution.optimizedPlan
println(plan.numberedTreeString)
00 Aggregate [_groupingexpression#9L], [_groupingexpression#9L AS group#2L, count(1) AS count#6L]
01 +- Project [(id#0L % 2) AS _groupingexpression#9L]
02 +- Range (0, 5, step=1, splits=Some(12))
import spark.sessionState.planner.Aggregation
val physicalPlan = Aggregation.apply(plan)
println(physicalPlan.head.numberedTreeString)
// HashAggregateExec selected
00 HashAggregate(keys=[_groupingexpression#9L], functions=[count(1)], output=[group#2L, count#6L])
01 +- HashAggregate(keys=[_groupingexpression#9L], functions=[partial_count(1)], output=[_groupingexpression#9L, count#11L])
02 +- PlanLater Project [(id#0L % 2) AS _groupingexpression#9L]