Aggregate Logical Operator¶
Aggregate
is an unary logical operator that represents the following high-level operators in a logical query plan:
AstBuilder
is requested to visitCommonSelectQueryClausePlan (HAVING
clause withoutGROUP BY
) and parse GROUP BY clauseKeyValueGroupedDataset
is requested to agg (and aggUntyped)RelationalGroupedDataset
is requested to toDF
Creating Instance¶
Aggregate
takes the following to be created:
- Grouping Expressions
- Aggregate NamedExpressions
- Child LogicalPlan
Aggregate
is created when:
AstBuilder
is requested to withSelectQuerySpecification and withAggregationClauseDslLogicalPlan
is used to groupByKeyValueGroupedDataset
is requested to aggUntypedRelationalGroupedDataset
is requested to toDF- AnalyzeColumnCommand logical command (when
CommandUtils
is used to computeColumnStats and computePercentiles)
Query Planning¶
Aggregate
logical operator is planned to one of the physical operators in Aggregation execution planning strategy (using PhysicalAggregation utility):
Logical Optimization¶
PushDownPredicate logical plan optimization applies so-called filter pushdown to a Pivot operator when under Filter
operator and with all expressions deterministic.
import org.apache.spark.sql.catalyst.optimizer.PushDownPredicate
val q = visits
.groupBy("city")
.pivot("year")
.count()
.where($"city" === "Boston")
val pivotPlanAnalyzed = q.queryExecution.analyzed
scala> println(pivotPlanAnalyzed.numberedTreeString)
00 Filter (city#8 = Boston)
01 +- Project [city#8, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[0] AS 2015#143L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[1] AS 2016#144L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[2] AS 2017#145L]
02 +- Aggregate [city#8], [city#8, pivotfirst(year#9, count(1) AS `count`#134L, 2015, 2016, 2017, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#142]
03 +- Aggregate [city#8, year#9], [city#8, year#9, count(1) AS count(1) AS `count`#134L]
04 +- Project [_1#3 AS id#7, _2#4 AS city#8, _3#5 AS year#9]
05 +- LocalRelation [_1#3, _2#4, _3#5]
val afterPushDown = PushDownPredicate(pivotPlanAnalyzed)
scala> println(afterPushDown.numberedTreeString)
00 Project [city#8, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[0] AS 2015#143L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[1] AS 2016#144L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[2] AS 2017#145L]
01 +- Aggregate [city#8], [city#8, pivotfirst(year#9, count(1) AS `count`#134L, 2015, 2016, 2017, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#142]
02 +- Aggregate [city#8, year#9], [city#8, year#9, count(1) AS count(1) AS `count`#134L]
03 +- Project [_1#3 AS id#7, _2#4 AS city#8, _3#5 AS year#9]
04 +- Filter (_2#4 = Boston)
05 +- LocalRelation [_1#3, _2#4, _3#5]