Skip to content

StatefulAggregationStrategy Execution Planning Strategy

StatefulAggregationStrategy is an execution planning strategy (Spark SQL) to plan streaming queries with EventTimeWatermark and Aggregate (Spark SQL) logical operators.

Logical Operator Physical Operator
EventTimeWatermark EventTimeWatermarkExec
Aggregate (Spark SQL) One of the following per selection requirements:

StatefulAggregationStrategy is used when IncrementalExecution is requested to plan a streaming query.

Accessing StatefulAggregationStrategy

StatefulAggregationStrategy is available using SessionState.



  groupingExpressions: Seq[NamedExpression],
  functionsWithoutDistinct: Seq[AggregateExpression],
  resultExpressions: Seq[NamedExpression],
  stateFormatVersion: Int,
  child: SparkPlan): Seq[SparkPlan]

planStreamingAggregation creates a streaming aggregate physical operator for Partial aggregation (with the given child physical operator as the child). The given functionsWithoutDistinct expressions are set up to work in Partial execution mode.

planStreamingAggregation creates another streaming aggregate physical operator for PartialMerge aggregation (with the partial aggregate physical operator as the child). The given functionsWithoutDistinct expressions are set up to work in PartialMerge execution mode.

planStreamingAggregation creates a StateStoreRestoreExec physical operator (with the partial-merge aggregate physical operator as the child).

planStreamingAggregation creates another streaming aggregate physical operator for PartialMerge aggregation (with the StateStoreRestoreExec physical operator as the child). The given functionsWithoutDistinct expressions are set up to work in PartialMerge execution mode.

planStreamingAggregation creates a StateStoreSaveExec physical operator (with the last partial-merge aggregate physical operator as the child).

In the end, planStreamingAggregation creates another streaming aggregate physical operator for Final aggregation (with the StateStoreSaveExec physical operator as the child). The given functionsWithoutDistinct expressions are set up to work in Final execution mode.

planStreamingAggregation is used when:

  • StatefulAggregationStrategy execution planning strategy is planning a streaming query with Aggregate (Spark SQL) logical operator with no session window


  groupingExpressions: Seq[NamedExpression],
  sessionExpression: NamedExpression,
  functionsWithoutDistinct: Seq[AggregateExpression],
  resultExpressions: Seq[NamedExpression],
  stateFormatVersion: Int,
  mergeSessionsInLocalPartition: Boolean,
  child: SparkPlan): Seq[SparkPlan]


planStreamingAggregationForSession is used when:

  • StatefulAggregationStrategy execution planning strategy is planning a streaming query with Aggregate (Spark SQL) logical operator with session window

Creating Streaming Aggregate Physical Operator

  requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
  groupingExpressions: Seq[NamedExpression] = Nil,
  aggregateExpressions: Seq[AggregateExpression] = Nil,
  aggregateAttributes: Seq[Attribute] = Nil,
  initialInputBufferOffset: Int = 0,
  resultExpressions: Seq[NamedExpression] = Nil,
  child: SparkPlan): SparkPlan

createStreamingAggregate creates one of the following physical operators:


Learn more about the selection requirements in The Internals of Spark SQL.


val counts = spark.
  groupBy(window($"timestamp", "5 seconds") as "group").
  agg(count("value") as "count").
scala> counts.explain
== Physical Plan ==
*Sort [group#6 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#6 ASC NULLS FIRST, 200)
   +- *HashAggregate(keys=[window#13], functions=[count(value#1L)])
      +- StateStoreSave [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0), Append, 0
         +- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
            +- StateStoreRestore [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0)
               +- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
                  +- Exchange hashpartitioning(window#13, 200)
                     +- *HashAggregate(keys=[window#13], functions=[partial_count(value#1L)])
                        +- *Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#13, value#1L]
                           +- *Filter isnotnull(timestamp#0)
                              +- StreamingRelation rate, [timestamp#0, value#1L]

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val consoleOutput = counts.
  option("truncate", false).
  outputMode(OutputMode.Complete).  // <-- required for groupBy

// Eventually...