StatefulAggregationStrategy Execution Planning Strategy¶
StatefulAggregationStrategy is an execution planning strategy (Spark SQL) to plan streaming queries with EventTimeWatermark and Aggregate (Spark SQL) logical operators.
| Logical Operator | Physical Operator |
|---|---|
| EventTimeWatermark | EventTimeWatermarkExec |
Aggregate (Spark SQL) | One of the following per selection requirements: |
StatefulAggregationStrategy is used when IncrementalExecution is requested to plan a streaming query.
Accessing StatefulAggregationStrategy¶
StatefulAggregationStrategy is available using SessionState.
spark.sessionState.planner.StatefulAggregationStrategy
planStreamingAggregation¶
planStreamingAggregation(
groupingExpressions: Seq[NamedExpression],
functionsWithoutDistinct: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
stateFormatVersion: Int,
child: SparkPlan): Seq[SparkPlan]
planStreamingAggregation creates a streaming aggregate physical operator for Partial aggregation (with the given child physical operator as the child). The given functionsWithoutDistinct expressions are set up to work in Partial execution mode.
planStreamingAggregation creates another streaming aggregate physical operator for PartialMerge aggregation (with the partial aggregate physical operator as the child). The given functionsWithoutDistinct expressions are set up to work in PartialMerge execution mode.
planStreamingAggregation creates a StateStoreRestoreExec physical operator (with the partial-merge aggregate physical operator as the child).
planStreamingAggregation creates another streaming aggregate physical operator for PartialMerge aggregation (with the StateStoreRestoreExec physical operator as the child). The given functionsWithoutDistinct expressions are set up to work in PartialMerge execution mode.
planStreamingAggregation creates a StateStoreSaveExec physical operator (with the last partial-merge aggregate physical operator as the child).
In the end, planStreamingAggregation creates another streaming aggregate physical operator for Final aggregation (with the StateStoreSaveExec physical operator as the child). The given functionsWithoutDistinct expressions are set up to work in Final execution mode.
planStreamingAggregation is used when:
StatefulAggregationStrategyexecution planning strategy is planning a streaming query withAggregate(Spark SQL) logical operator with no session window
planStreamingAggregationForSession¶
planStreamingAggregationForSession(
groupingExpressions: Seq[NamedExpression],
sessionExpression: NamedExpression,
functionsWithoutDistinct: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
stateFormatVersion: Int,
mergeSessionsInLocalPartition: Boolean,
child: SparkPlan): Seq[SparkPlan]
planStreamingAggregationForSession...FIXME
planStreamingAggregationForSession is used when:
StatefulAggregationStrategyexecution planning strategy is planning a streaming query withAggregate(Spark SQL) logical operator with session window
Creating Streaming Aggregate Physical Operator¶
createStreamingAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
groupingExpressions: Seq[NamedExpression] = Nil,
aggregateExpressions: Seq[AggregateExpression] = Nil,
aggregateAttributes: Seq[Attribute] = Nil,
initialInputBufferOffset: Int = 0,
resultExpressions: Seq[NamedExpression] = Nil,
child: SparkPlan): SparkPlan
createStreamingAggregate creates one of the following physical operators:
Note
Learn more about the selection requirements in The Internals of Spark SQL.
Demo¶
val counts = spark.
readStream.
format("rate").
load.
groupBy(window($"timestamp", "5 seconds") as "group").
agg(count("value") as "count").
orderBy("group")
scala> counts.explain
== Physical Plan ==
*Sort [group#6 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#6 ASC NULLS FIRST, 200)
+- *HashAggregate(keys=[window#13], functions=[count(value#1L)])
+- StateStoreSave [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0), Append, 0
+- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
+- StateStoreRestore [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0)
+- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
+- Exchange hashpartitioning(window#13, 200)
+- *HashAggregate(keys=[window#13], functions=[partial_count(value#1L)])
+- *Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#13, value#1L]
+- *Filter isnotnull(timestamp#0)
+- StreamingRelation rate, [timestamp#0, value#1L]
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val consoleOutput = counts.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(10.seconds)).
queryName("counts").
outputMode(OutputMode.Complete). // <-- required for groupBy
start
// Eventually...
consoleOutput.stop