Aggregate Logical Operator¶
Aggregate
is an unary logical operator that represents the following high-level operators in a logical query plan:
AstBuilder
is requested to visitCommonSelectQueryClausePlan (HAVING
clause withoutGROUP BY
) and parse GROUP BY clauseKeyValueGroupedDataset
is requested to agg (and aggUntyped)RelationalGroupedDataset
is requested to toDF
Creating Instance¶
Aggregate
takes the following to be created:
- Grouping Expressions
- Aggregate NamedExpressions
- Child LogicalPlan
Aggregate
is created when:
AstBuilder
is requested to withSelectQuerySpecification and withAggregationClauseDslLogicalPlan
is used to groupByKeyValueGroupedDataset
is requested to aggUntypedRelationalGroupedDataset
is requested to toDF- AnalyzeColumnCommand logical command (when
CommandUtils
is used to computeColumnStats and computePercentiles)
Checking Requirements for HashAggregateExec¶
supportsHashAggregate(
aggregateBufferAttributes: Seq[Attribute]): Boolean
supportsHashAggregate
builds a StructType for the given aggregateBufferAttributes
.
In the end, supportsHashAggregate
isAggregateBufferMutable.
supportsHashAggregate
is used when:
MergeScalarSubqueries
is requested tosupportedAggregateMerge
AggUtils
is requested to create a physical operator for aggregationHashAggregateExec
physical operator is created (to assert that the aggregateBufferAttributes are supported)
isAggregateBufferMutable¶
isAggregateBufferMutable(
schema: StructType): Boolean
isAggregateBufferMutable
is enabled (true
) when the type of all the fields (in the given schema
) are mutable.
isAggregateBufferMutable
is used when:
Aggregate
is requested to check the requirements for HashAggregateExecUnsafeFixedWidthAggregationMap
is requested to supportsAggregationBufferSchema
Query Planning¶
Aggregate
logical operator is planned to one of the physical operators in Aggregation execution planning strategy (using PhysicalAggregation utility):
Logical Optimization¶
PushDownPredicate logical plan optimization applies so-called filter pushdown to a Pivot operator when under Filter
operator and with all expressions deterministic.
import org.apache.spark.sql.catalyst.optimizer.PushDownPredicate
val q = visits
.groupBy("city")
.pivot("year")
.count()
.where($"city" === "Boston")
val pivotPlanAnalyzed = q.queryExecution.analyzed
scala> println(pivotPlanAnalyzed.numberedTreeString)
00 Filter (city#8 = Boston)
01 +- Project [city#8, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[0] AS 2015#143L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[1] AS 2016#144L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[2] AS 2017#145L]
02 +- Aggregate [city#8], [city#8, pivotfirst(year#9, count(1) AS `count`#134L, 2015, 2016, 2017, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#142]
03 +- Aggregate [city#8, year#9], [city#8, year#9, count(1) AS count(1) AS `count`#134L]
04 +- Project [_1#3 AS id#7, _2#4 AS city#8, _3#5 AS year#9]
05 +- LocalRelation [_1#3, _2#4, _3#5]
val afterPushDown = PushDownPredicate(pivotPlanAnalyzed)
scala> println(afterPushDown.numberedTreeString)
00 Project [city#8, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[0] AS 2015#143L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[1] AS 2016#144L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[2] AS 2017#145L]
01 +- Aggregate [city#8], [city#8, pivotfirst(year#9, count(1) AS `count`#134L, 2015, 2016, 2017, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#142]
02 +- Aggregate [city#8, year#9], [city#8, year#9, count(1) AS count(1) AS `count`#134L]
03 +- Project [_1#3 AS id#7, _2#4 AS city#8, _3#5 AS year#9]
04 +- Filter (_2#4 = Boston)
05 +- LocalRelation [_1#3, _2#4, _3#5]