Skip to content

Aggregation Execution Planning Strategy

Aggregation is an execution planning strategy that SparkPlanner uses for planning Aggregate logical operators (in the order of preference):

  1. HashAggregateExec
  2. ObjectHashAggregateExec
  3. SortAggregateExec

Executing Rule

GenericStrategy
apply(
  plan: LogicalPlan): Seq[SparkPlan]

apply is part of the GenericStrategy abstraction.

apply plans Aggregate logical operators with the aggregate expressions all being of the same type, either AggregateExpressions or PythonUDFs (with SQL_GROUPED_AGG_PANDAS_UDF eval type).

AnalysisException

apply throws an AnalysisException for mis-placed PythonUDFs:

Cannot use a mixture of aggregate function and group aggregate pandas UDF

apply destructures the Aggregate logical operator (into a four-element tuple) with the following:

  • Grouping Expressions
  • Aggregration Expressions
  • Result Expressions
  • Child Logical Operator

AggregateExpressions

For Aggregate logical operators with AggregateExpressions, apply splits them based on the isDistinct flag.

Without distinct aggregate functions (expressions), apply planAggregateWithoutDistinct. Otherwise, apply planAggregateWithOneDistinct.

In the end, apply creates one of the following physical operators based on whether there is distinct aggregate function or not.

Note

It is assumed that all the distinct aggregate functions have the same column expressions.

COUNT(DISTINCT foo), MAX(DISTINCT foo)

The following is not valid due to different column expressions

COUNT(DISTINCT bar), COUNT(DISTINCT foo)

PythonUDFs

For Aggregate logical operators with PythonUDFs, apply...FIXME

Demo

A structured query with count aggregate function.

val q = spark
  .range(5)
  .groupBy($"id" % 2 as "group")
  .agg(count("id") as "count")
val plan = q.queryExecution.optimizedPlan
println(plan.numberedTreeString)
00 Aggregate [_groupingexpression#9L], [_groupingexpression#9L AS group#2L, count(1) AS count#6L]
01 +- Project [(id#0L % 2) AS _groupingexpression#9L]
02    +- Range (0, 5, step=1, splits=Some(12))
import spark.sessionState.planner.Aggregation
val physicalPlan = Aggregation.apply(plan)
println(physicalPlan.head.numberedTreeString)
// HashAggregateExec selected
00 HashAggregate(keys=[_groupingexpression#9L], functions=[count(1)], output=[group#2L, count#6L])
01 +- HashAggregate(keys=[_groupingexpression#9L], functions=[partial_count(1)], output=[_groupingexpression#9L, count#11L])
02    +- PlanLater Project [(id#0L % 2) AS _groupingexpression#9L]