StatefulAggregationStrategy Execution Planning Strategy¶
StatefulAggregationStrategy
is an execution planning strategy (Spark SQL) to plan streaming queries with EventTimeWatermark and Aggregate
(Spark SQL) logical operators.
Logical Operator | Physical Operator |
---|---|
EventTimeWatermark | EventTimeWatermarkExec |
Aggregate (Spark SQL) | One of the following per selection requirements: |
StatefulAggregationStrategy
is used when IncrementalExecution is requested to plan a streaming query.
Accessing StatefulAggregationStrategy¶
StatefulAggregationStrategy
is available using SessionState
.
spark.sessionState.planner.StatefulAggregationStrategy
planStreamingAggregation¶
planStreamingAggregation(
groupingExpressions: Seq[NamedExpression],
functionsWithoutDistinct: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
stateFormatVersion: Int,
child: SparkPlan): Seq[SparkPlan]
planStreamingAggregation
creates a streaming aggregate physical operator for Partial
aggregation (with the given child
physical operator as the child). The given functionsWithoutDistinct
expressions are set up to work in Partial
execution mode.
planStreamingAggregation
creates another streaming aggregate physical operator for PartialMerge
aggregation (with the partial aggregate physical operator as the child). The given functionsWithoutDistinct
expressions are set up to work in PartialMerge
execution mode.
planStreamingAggregation
creates a StateStoreRestoreExec physical operator (with the partial-merge aggregate physical operator as the child).
planStreamingAggregation
creates another streaming aggregate physical operator for PartialMerge
aggregation (with the StateStoreRestoreExec
physical operator as the child). The given functionsWithoutDistinct
expressions are set up to work in PartialMerge
execution mode.
planStreamingAggregation
creates a StateStoreSaveExec physical operator (with the last partial-merge aggregate physical operator as the child).
In the end, planStreamingAggregation
creates another streaming aggregate physical operator for Final
aggregation (with the StateStoreSaveExec
physical operator as the child). The given functionsWithoutDistinct
expressions are set up to work in Final
execution mode.
planStreamingAggregation
is used when:
StatefulAggregationStrategy
execution planning strategy is planning a streaming query withAggregate
(Spark SQL) logical operator with no session window
planStreamingAggregationForSession¶
planStreamingAggregationForSession(
groupingExpressions: Seq[NamedExpression],
sessionExpression: NamedExpression,
functionsWithoutDistinct: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
stateFormatVersion: Int,
mergeSessionsInLocalPartition: Boolean,
child: SparkPlan): Seq[SparkPlan]
planStreamingAggregationForSession
...FIXME
planStreamingAggregationForSession
is used when:
StatefulAggregationStrategy
execution planning strategy is planning a streaming query withAggregate
(Spark SQL) logical operator with session window
Creating Streaming Aggregate Physical Operator¶
createStreamingAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
groupingExpressions: Seq[NamedExpression] = Nil,
aggregateExpressions: Seq[AggregateExpression] = Nil,
aggregateAttributes: Seq[Attribute] = Nil,
initialInputBufferOffset: Int = 0,
resultExpressions: Seq[NamedExpression] = Nil,
child: SparkPlan): SparkPlan
createStreamingAggregate
creates one of the following physical operators:
Note
Learn more about the selection requirements in The Internals of Spark SQL.
Demo¶
val counts = spark.
readStream.
format("rate").
load.
groupBy(window($"timestamp", "5 seconds") as "group").
agg(count("value") as "count").
orderBy("group")
scala> counts.explain
== Physical Plan ==
*Sort [group#6 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#6 ASC NULLS FIRST, 200)
+- *HashAggregate(keys=[window#13], functions=[count(value#1L)])
+- StateStoreSave [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0), Append, 0
+- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
+- StateStoreRestore [window#13], StatefulOperatorStateInfo(<unknown>,736d67c2-6daa-4c4c-9c4b-c12b15af20f4,0,0)
+- *HashAggregate(keys=[window#13], functions=[merge_count(value#1L)])
+- Exchange hashpartitioning(window#13, 200)
+- *HashAggregate(keys=[window#13], functions=[partial_count(value#1L)])
+- *Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#13, value#1L]
+- *Filter isnotnull(timestamp#0)
+- StreamingRelation rate, [timestamp#0, value#1L]
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val consoleOutput = counts.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(10.seconds)).
queryName("counts").
outputMode(OutputMode.Complete). // <-- required for groupBy
start
// Eventually...
consoleOutput.stop