Stateful Stream Processing¶
Stateful Stream Processing is a stream processing with state (implicit or explicit).
In Spark Structured Streaming, a streaming query is stateful when is one of the following (that makes use of StateStores):
Versioned State, StateStores and StateStoreProviders¶
Spark Structured Streaming uses StateStores for versioned and fault-tolerant key-value state stores.
State stores are checkpointed incrementally to avoid state loss and for increased performance.
State stores are managed by StateStoreProviders with HDFSBackedStateStoreProvider being the default and only known implementation. HDFSBackedStateStoreProvider
uses Hadoop DFS-compliant file system for state checkpointing and fault-tolerance.
State store providers manage versioned state per stateful operator (and partition it operates on).
The lifecycle of a StateStoreProvider
begins when StateStore
utility (on a Spark executor) is requested for the StateStore by provider ID and version.
Important
It is worth to notice that since StateStore
and StateStoreProvider
utilities are Scala objects that makes it possible that there can only be one instance of StateStore
and StateStoreProvider
on a single JVM. Scala objects are (sort of) singletons which means that there will be exactly one instance of each per JVM and that is exactly the JVM of a Spark executor. As long as the executor is up and running state versions are cached and no Hadoop DFS is used (except for the initial load).
When requested for a StateStore, StateStore
utility is given the version of a state store to look up. The version is either the <
StateStore
utility requests StateStoreProvider
utility to <StateStoreProvider
implementation (based on spark.sql.streaming.stateStore.providerClass internal configuration property) and requests it to <
The initialized StateStoreProvider
is cached in loadedProviders internal lookup table (for a StateStoreId) for later lookups.
StateStoreProvider
utility then requests the StateStoreProvider
for the <
An instance of StateStoreProvider
is requested to <
IncrementalExecution — QueryExecution of Streaming Queries¶
Regardless of the query language (Dataset API or SQL), any structured query (incl. streaming queries) becomes a logical query plan.
In Spark Structured Streaming it is IncrementalExecution that plans streaming queries for execution.
While planning a streaming query for execution (aka query planning), IncrementalExecution
uses the state preparation rule. The rule fills out the following physical operators with the execution-specific configuration (with StatefulOperatorStateInfo being the most important for stateful stream processing):
==== [[IncrementalExecution-shouldRunAnotherBatch]] Micro-Batch Stream Processing and Extra Non-Data Batch for StateStoreWriter Stateful Operators
In Micro-Batch Stream Processing (with MicroBatchExecution engine), IncrementalExecution
uses shouldRunAnotherBatch flag that allows StateStoreWriters stateful physical operators to indicate whether the last batch execution requires another non-data batch.
The following StateStoreWriters
redefine shouldRunAnotherBatch
flag.
[[StateStoreWriters-shouldRunAnotherBatch]] .StateStoreWriters and shouldRunAnotherBatch Flag [cols="30,70",options="header",width="100%"] |=== | StateStoreWriter | shouldRunAnotherBatch Flag
| FlatMapGroupsWithStateExec a| [[shouldRunAnotherBatch-FlatMapGroupsWithStateExec]] Based on GroupStateTimeout
| <
| <
| <
|===
StateStoreRDD¶
Right after query planning, a stateful streaming query (a single micro-batch actually) becomes an RDD with one or more StateStoreRDDs.
You can find the StateStoreRDDs
of a streaming query in the RDD lineage.
scala> :type streamingQuery
org.apache.spark.sql.streaming.StreamingQuery
scala> streamingQuery.explain
== Physical Plan ==
*(4) HashAggregate(keys=[window#13-T0ms, value#3L], functions=[count(1)])
+- StateStoreSave [window#13-T0ms, value#3L], state info [ checkpoint = file:/tmp/checkpoint-counts/state, runId = 1dec2d81-f2d0-45b9-8f16-39ede66e13e7, opId = 0, ver = 1, numPartitions = 1], Append, 10000, 2
+- *(3) HashAggregate(keys=[window#13-T0ms, value#3L], functions=[merge_count(1)])
+- StateStoreRestore [window#13-T0ms, value#3L], state info [ checkpoint = file:/tmp/checkpoint-counts/state, runId = 1dec2d81-f2d0-45b9-8f16-39ede66e13e7, opId = 0, ver = 1, numPartitions = 1], 2
+- *(2) HashAggregate(keys=[window#13-T0ms, value#3L], functions=[merge_count(1)])
+- Exchange hashpartitioning(window#13-T0ms, value#3L, 1)
+- *(1) HashAggregate(keys=[window#13-T0ms, value#3L], functions=[partial_count(1)])
+- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#13-T0ms, value#3L]
+- *(1) Filter isnotnull(time#2-T0ms)
+- EventTimeWatermark time#2: timestamp, interval
+- LocalTableScan <empty>, [time#2, value#3L]
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
val se = streamingQuery.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution
scala> :type se.lastExecution
org.apache.spark.sql.execution.streaming.IncrementalExecution
val rdd = se.lastExecution.toRdd
scala> rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[39] at toRdd at <console>:40 []
| StateStoreRDD[38] at toRdd at <console>:40 [] // <-- here
| MapPartitionsRDD[37] at toRdd at <console>:40 []
| StateStoreRDD[36] at toRdd at <console>:40 [] // <-- here
| MapPartitionsRDD[35] at toRdd at <console>:40 []
| ShuffledRowRDD[17] at start at <pastie>:67 []
+-(1) MapPartitionsRDD[16] at start at <pastie>:67 []
| MapPartitionsRDD[15] at start at <pastie>:67 []
| MapPartitionsRDD[14] at start at <pastie>:67 []
| MapPartitionsRDD[13] at start at <pastie>:67 []
| ParallelCollectionRDD[12] at start at <pastie>:67 []
StateStoreCoordinator RPC Endpoint, StateStoreRDD and Preferred Locations¶
Since execution of a stateful streaming query happens on Spark executors whereas planning is on the driver, Spark Structured Streaming uses RPC environment for tracking locations of the state stores in use. That makes the tasks (of a structured query) to be scheduled where the state (of a partition) is.
When planned for execution, the StateStoreRDD
is first asked for the preferred locations of a partition (which happens on the driver) that are later used to compute it (on Spark executors).
Spark Structured Streaming uses RPC environment to keep track of StateStores (their StateStoreProvider actually) for RDD planning.
Every time StateStoreRDD is requested for the preferred locations of a partition, it communicates with the StateStoreCoordinator RPC endpoint that knows the locations of the required StateStores
(per host and executor ID).
StateStoreRDD
uses StateStoreProviderId with StateStoreId to uniquely identify the state store to use for (associate with) a stateful operator and a partition.
State Management¶
The state in a stateful streaming query can be implicit or explicit.