Skip to content

StatefulAggregationStrategy Execution Planning Strategy

StatefulAggregationStrategy is an execution planning strategy (Spark SQL) to plan streaming queries with EventTimeWatermark and Aggregate (Spark SQL) logical operators.

Logical Operator Physical Operator
EventTimeWatermark EventTimeWatermarkExec
Aggregate (Spark SQL) One of the following per selection requirements:

StatefulAggregationStrategy is used when IncrementalExecution is requested to plan a streaming query.

Accessing StatefulAggregationStrategy

StatefulAggregationStrategy is available using SessionState.

spark.sessionState.planner.StatefulAggregationStrategy

planStreamingAggregation

planStreamingAggregation(
  groupingExpressions: Seq[NamedExpression],
  functionsWithoutDistinct: Seq[AggregateExpression],
  resultExpressions: Seq[NamedExpression],
  stateFormatVersion: Int,
  child: SparkPlan): Seq[SparkPlan]

planStreamingAggregation creates a streaming aggregate physical operator for Partial aggregation (with the given child physical operator as the child). The given functionsWithoutDistinct expressions are set up to work in Partial execution mode.

planStreamingAggregation creates another streaming aggregate physical operator for PartialMerge aggregation (with the partial aggregate physical operator as the child). The given functionsWithoutDistinct expressions are set up to work in PartialMerge execution mode.

planStreamingAggregation creates a StateStoreRestoreExec physical operator (with the partial-merge aggregate physical operator as the child).

planStreamingAggregation creates another streaming aggregate physical operator for PartialMerge aggregation (with the StateStoreRestoreExec physical operator as the child). The given functionsWithoutDistinct expressions are set up to work in PartialMerge execution mode.

planStreamingAggregation creates a StateStoreSaveExec physical operator (with the last partial-merge aggregate physical operator as the child).

In the end, planStreamingAggregation creates another streaming aggregate physical operator for Final aggregation (with the StateStoreSaveExec physical operator as the child). The given functionsWithoutDistinct expressions are set up to work in Final execution mode.


planStreamingAggregation is used when:

  • StatefulAggregationStrategy execution planning strategy is planning a streaming query with Aggregate (Spark SQL) logical operator with no session window

planStreamingAggregationForSession

planStreamingAggregationForSession(
  groupingExpressions: Seq[NamedExpression],
  sessionExpression: NamedExpression,
  functionsWithoutDistinct: Seq[AggregateExpression],
  resultExpressions: Seq[NamedExpression],
  stateFormatVersion: Int,
  mergeSessionsInLocalPartition: Boolean,
  child: SparkPlan): Seq[SparkPlan]

planStreamingAggregationForSession...FIXME


planStreamingAggregationForSession is used when:

  • StatefulAggregationStrategy execution planning strategy is planning a streaming query with Aggregate (Spark SQL) logical operator with session window

Creating Streaming Aggregate Physical Operator

createStreamingAggregate(
  requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
  groupingExpressions: Seq[NamedExpression] = Nil,
  aggregateExpressions: Seq[AggregateExpression] = Nil,
  aggregateAttributes: Seq[Attribute] = Nil,
  initialInputBufferOffset: Int = 0,
  resultExpressions: Seq[NamedExpression] = Nil,
  child: SparkPlan): SparkPlan

createStreamingAggregate creates one of the following physical operators:

Note

Learn more about the selection requirements in The Internals of Spark SQL.

Demo

val counts = spark.
  readStream.
  format("rate").
  load.
  groupBy(window($"timestamp", "5 seconds") as "group").
  agg(count("value") as "count").
  orderBy("group")
scala> counts.explain
== Physical Plan ==
*Sort [group#6 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#6 ASC NULLS FIRST, 200)
   +- *HashAggregate(keys=[window#13], functions=[count(value#1L)])
      +- StateStoreSave [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0), Append, 0
         +- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
            +- StateStoreRestore [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0)
               +- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
                  +- Exchange hashpartitioning(window#13, 200)
                     +- *HashAggregate(keys=[window#13], functions=[partial_count(value#1L)])
                        +- *Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#13, value#1L]
                           +- *Filter isnotnull(timestamp#0)
                              +- StreamingRelation rate, [timestamp#0, value#1L]

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val consoleOutput = counts.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(10.seconds)).
  queryName("counts").
  outputMode(OutputMode.Complete).  // <-- required for groupBy
  start

// Eventually...
consoleOutput.stop