Skip to content

Aggregate Logical Operator

Aggregate is an unary logical operator that represents the following high-level operators in a logical query plan:

Creating Instance

Aggregate takes the following to be created:

Aggregate is created when:

Checking Requirements for HashAggregateExec

supportsHashAggregate(
  aggregateBufferAttributes: Seq[Attribute]): Boolean

supportsHashAggregate builds a StructType for the given aggregateBufferAttributes.

In the end, supportsHashAggregate isAggregateBufferMutable.


supportsHashAggregate is used when:

isAggregateBufferMutable

isAggregateBufferMutable(
  schema: StructType): Boolean

isAggregateBufferMutable is enabled (true) when the type of all the fields (in the given schema) are mutable.


isAggregateBufferMutable is used when:

Query Planning

Aggregate logical operator is planned to one of the physical operators in Aggregation execution planning strategy (using PhysicalAggregation utility):

Logical Optimization

PushDownPredicate logical plan optimization applies so-called filter pushdown to a Pivot operator when under Filter operator and with all expressions deterministic.

import org.apache.spark.sql.catalyst.optimizer.PushDownPredicate

val q = visits
  .groupBy("city")
  .pivot("year")
  .count()
  .where($"city" === "Boston")

val pivotPlanAnalyzed = q.queryExecution.analyzed
scala> println(pivotPlanAnalyzed.numberedTreeString)
00 Filter (city#8 = Boston)
01 +- Project [city#8, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[0] AS 2015#143L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[1] AS 2016#144L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[2] AS 2017#145L]
02    +- Aggregate [city#8], [city#8, pivotfirst(year#9, count(1) AS `count`#134L, 2015, 2016, 2017, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#142]
03       +- Aggregate [city#8, year#9], [city#8, year#9, count(1) AS count(1) AS `count`#134L]
04          +- Project [_1#3 AS id#7, _2#4 AS city#8, _3#5 AS year#9]
05             +- LocalRelation [_1#3, _2#4, _3#5]

val afterPushDown = PushDownPredicate(pivotPlanAnalyzed)
scala> println(afterPushDown.numberedTreeString)
00 Project [city#8, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[0] AS 2015#143L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[1] AS 2016#144L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[2] AS 2017#145L]
01 +- Aggregate [city#8], [city#8, pivotfirst(year#9, count(1) AS `count`#134L, 2015, 2016, 2017, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#142]
02    +- Aggregate [city#8, year#9], [city#8, year#9, count(1) AS count(1) AS `count`#134L]
03       +- Project [_1#3 AS id#7, _2#4 AS city#8, _3#5 AS year#9]
04          +- Filter (_2#4 = Boston)
05             +- LocalRelation [_1#3, _2#4, _3#5]