IncrementalExecution¶
IncrementalExecution
is a QueryExecution
(Spark SQL) made for streaming queries with the following customizations:
- Extra handling of
CurrentBatchTimestamp
andExpressionWithRandomSeed
expressions for the optimizedPlan - Extra Planning Strategies
- Extra Physical Optimizations
IncrementalExecution
is part of StreamExecutions (e.g. MicroBatchExecution and ContinuousExecution).
Creating Instance¶
IncrementalExecution
takes the following to be created:
-
SparkSession
(Spark SQL) -
LogicalPlan
(Spark SQL) - OutputMode
- State Checkpoint Location
- Query ID
- Run ID
- Current Batch ID
- OffsetSeqMetadata
IncrementalExecution
is created when:
MicroBatchExecution
is requested to run a single streaming micro-batch (in queryPlanning phase)ContinuousExecution
is requested to run a streaming query in continuous mode (in queryPlanning phase)- Dataset.explain operator is executed (on a streaming query)
OutputMode¶
IncrementalExecution
is given an OutputMode when created.
The OutputMode
is used for the following:
- planner (to create StreamingGlobalLimitStrategy)
- state (to fill in StateStoreSaveExec, SessionWindowStateStoreSaveExec, StreamingGlobalLimitExec with the state of a current microbatch)
State Checkpoint Location¶
IncrementalExecution
is given a directory (location) for state checkpointing when created.
The directory is as follows:
state
directory in the checkpoint root directory in MicroBatchExecution and ContinuousExecution<unknown>
whenQueryExecution
is requested toexplainString
(Spark SQL) on a streaming query (since there is no output specified yet)
val queryName = "rate2memory"
val checkpointLocation = s"file:/tmp/checkpoint-$queryName"
val query = spark
.readStream
.format("rate")
.load
.groupBy($"value" % 2 as "gid")
.count
scala> query.explain
== Physical Plan ==
*(4) HashAggregate(keys=[_groupingexpression#22L], functions=[count(1)])
+- StateStoreSave [_groupingexpression#22L], state info [ checkpoint = <unknown>, runId = 0ec02b7b-ada5-4a71-970e-145453417319, opId = 0, ver = 0, numPartitions = 200], Append, 0, 2
+- *(3) HashAggregate(keys=[_groupingexpression#22L], functions=[merge_count(1)])
+- StateStoreRestore [_groupingexpression#22L], state info [ checkpoint = <unknown>, runId = 0ec02b7b-ada5-4a71-970e-145453417319, opId = 0, ver = 0, numPartitions = 200], 2
+- *(2) HashAggregate(keys=[_groupingexpression#22L], functions=[merge_count(1)])
+- Exchange hashpartitioning(_groupingexpression#22L, 200), ENSURE_REQUIREMENTS, [plan_id=138]
+- *(1) HashAggregate(keys=[_groupingexpression#22L], functions=[partial_count(1)])
+- *(1) Project [(value#9L % 2) AS _groupingexpression#22L]
+- StreamingRelation rate, [timestamp#8, value#9L]
query.writeStream
.format("memory")
.queryName(queryName)
.option("checkpointLocation", checkpointLocation)
.start
// Give the streaming query a moment (one micro-batch)
// So lastExecution is available for the checkpointLocation
import scala.concurrent.duration._
query.awaitTermination(1.second.toMillis)
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val stateCheckpointDir = query
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
.lastExecution
.checkpointLocation
val stateDir = s"$checkpointLocation/state"
assert(stateCheckpointDir equals stateDir)
The State Checkpoint Location is used when:
IncrementalExecution
is requested for the state info of the next stateful operator (when requested to optimize a streaming physical plan using the state preparation rule that creates a stateful physical operator).
StreamingExplainCommand¶
IncrementalExecution
is used to create a StreamingExplainCommand
.
statefulOperatorId¶
IncrementalExecution
uses the statefulOperatorId
internal counter for the IDs of the stateful operators in the optimized logical plan (while applying the preparations rules) when requested to prepare the plan for execution (in executedPlan phase).
Preparing Logical Plan (of Streaming Query) for Execution¶
When requested for an optimized logical plan (of the analyzed logical plan), IncrementalExecution
transforms CurrentBatchTimestamp
and ExpressionWithRandomSeed
expressions with the timestamp literal and new random seeds, respectively. When transforming CurrentBatchTimestamp
expressions, IncrementalExecution
prints out the following INFO message to the logs:
Current batch timestamp = [timestamp]
Right after being created, IncrementalExecution
is executed (in the queryPlanning phase by the MicroBatchExecution and ContinuousExecution stream execution engines) and so the entire query execution pipeline is executed up to and including executedPlan. That means that the extra planning strategies and the state preparation rule have been applied at this point and the streaming query is ready for execution.
Physical Optimization (Preparations Rules)¶
preparations: Seq[Rule[SparkPlan]]
preparations
is part of the QueryExecution
(Spark SQL) abstraction.
preparations
is the state optimization rules before the parent's ones.
Number of State Stores (spark.sql.shuffle.partitions)¶
numStateStores: Int
numStateStores
is the number of state stores which corresponds to spark.sql.shuffle.partitions
configuration property (default: 200
).
Tip
Learn more about spark.sql.shuffle.partitions configuration property in The Internals of Spark SQL online book.
Internally, numStateStores
requests the OffsetSeqMetadata for the spark.sql.shuffle.partitions configuration property (using the streaming configuration) or simply takes whatever was defined for the given SparkSession (default: 200
).
numStateStores
is initialized right when IncrementalExecution
is created.
numStateStores
is used when IncrementalExecution
is requested for the state info of the next stateful operator (when requested to optimize a streaming physical plan using the state preparation rule that creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).
Extra Planning Strategies¶
IncrementalExecution
uses a custom SparkPlanner
(Spark SQL) with the following extra planning strategies to plan the streaming query for execution:
- StreamingJoinStrategy
- StatefulAggregationStrategy
- FlatMapGroupsWithStateStrategy
- StreamingRelationStrategy
- StreamingDeduplicationStrategy
- StreamingGlobalLimitStrategy
State Preparation Rule For Execution-Specific Configuration¶
state: Rule[SparkPlan]
state
is a custom physical preparation rule (Rule[SparkPlan]
) that can transform a streaming physical plan (SparkPlan
) with the following physical operators:
-
StateStoreSaveExec with any unary physical operator (
UnaryExecNode
) with a StateStoreRestoreExec
state
simply transforms the physical plan with the above physical operators and fills out the execution-specific configuration:
-
nextStatefulOperationStateInfo for the state info
-
batchWatermarkMs (through the OffsetSeqMetadata) for the event-time watermark
-
batchTimestampMs (through the OffsetSeqMetadata) for the current timestamp
-
getStateWatermarkPredicates for the state watermark predicates (for StreamingSymmetricHashJoinExec)
state
rule is used (as part of the physical query optimizations) when IncrementalExecution
is requested to optimize (prepare) the physical plan of the streaming query (once for ContinuousExecution and every trigger for MicroBatchExecution in queryPlanning phase).
Tip
Learn more about Physical Query Optimizations in The Internals of Spark SQL online book.
Next StatefulOperationStateInfo¶
nextStatefulOperationStateInfo(): StatefulOperatorStateInfo
nextStatefulOperationStateInfo
simply creates a new StatefulOperatorStateInfo with the state checkpoint location, the run ID (of the streaming query), the next statefulOperator ID, the current batch ID, and the number of state stores.
Note
The only changing part of StatefulOperatorStateInfo
across calls of the nextStatefulOperationStateInfo
method is the the next statefulOperator ID.
All the other properties (the state checkpoint location, the run ID, the current batch ID, and the number of state stores) are the same within a single IncrementalExecution
instance.
The only two properties that may ever change are the run ID (after a streaming query is restarted from the checkpoint) and the current batch ID (every micro-batch in MicroBatchExecution execution engine).
nextStatefulOperationStateInfo
is used when IncrementalExecution
is requested to optimize a streaming physical plan using the state preparation rule (and creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).
Checking Out Whether Last Execution Requires Another Non-Data Micro-Batch¶
shouldRunAnotherBatch(
newMetadata: OffsetSeqMetadata): Boolean
shouldRunAnotherBatch
is positive (true
) if there is at least one StateStoreWriter operator (in the executedPlan physical query plan) that requires another non-data batch (per the given OffsetSeqMetadata with the event-time watermark and the batch timestamp).
Otherwise, shouldRunAnotherBatch
is negative (false
).
shouldRunAnotherBatch
is used when MicroBatchExecution
is requested to construct the next streaming micro-batch (and checks out whether the last batch execution requires another non-data batch).
Demo: State Checkpoint Directory¶
Using setConf(SHUFFLE_PARTITIONS, 1)
will make for an easier debugging as the state is then only for one partition and makes monitoring easier.
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)
assert(spark.sessionState.conf.numShufflePartitions == 1)
Using the rate
source as an input.
val counts = spark
.readStream
.format("rate")
.load
.groupBy(window($"timestamp", "5 seconds") as "group")
.agg(count("value") as "value_count") // <-- creates an Aggregate logical operator
.orderBy("group") // <-- makes for easier checking
assert(counts.isStreaming, "This should be a streaming query")
Searching for checkpoint = <unknown>
in the following output for StateStoreSaveExec and StateStoreRestoreExec physical operators.
scala> counts.explain
== Physical Plan ==
*(5) Sort [group#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#5 ASC NULLS FIRST, 1)
+- *(4) HashAggregate(keys=[window#11], functions=[count(value#1L)])
+- StateStoreSave [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], Append, 0, 2
+- *(3) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
+- StateStoreRestore [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], 2
+- *(2) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
+- Exchange hashpartitioning(window#11, 1)
+- *(1) HashAggregate(keys=[window#11], functions=[partial_count(value#1L)])
+- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#11, value#1L]
+- *(1) Filter isnotnull(timestamp#0)
+- StreamingRelation rate, [timestamp#0, value#1L]
Start the query with the checkpointLocation
option.
val checkpointLocation = "/tmp/spark-streams-state-checkpoint-root"
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val t = Trigger.ProcessingTime(1.hour) // should be enough time for exploration
val sq = counts
.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", checkpointLocation)
.trigger(t)
.outputMode(OutputMode.Complete)
.start
Wait till the first batch which should happen right after start and access lastExecution
that has the checkpoint resolved.
import org.apache.spark.sql.execution.streaming._
val lastExecution = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
assert(lastExecution.checkpointLocation == s"file:${checkpointLocation}/state")