StateStore¶
StateStore
is an extension of the ReadStateStore abstraction for versioned key-value stores for writing and reading state for Stateful Stream Processing (e.g., for persisting running aggregates in Streaming Aggregation).
Note
StateStore
was introduced in [SPARK-13809][SQL] State store for streaming aggregations.
Read the motivation and design in State Store for Streaming Aggregations.
Contract¶
Committing State Changes¶
commit(): Long
Commits all updates (puts and removes) and returns a new version
See:
Used when:
FlatMapGroupsWithStateExec
physical operator is requested to processDataWithPartitionSessionWindowStateStoreSaveExec
physical operator is executedStreamingDeduplicateExec
physical operator is executedStreamingGlobalLimitExec
physical operator is executedStreamingAggregationStateManagerBaseImpl
is requested to commitStreamingSessionWindowStateManagerImplV1
is requested tocommit
StateStoreHandler
is requested to commit
StateStoreMetrics¶
metrics: StateStoreMetrics
StateStoreMetrics of this state store
Used when:
StateStoreWriter
physical operator is requested to setStoreMetricsStateStoreHandler
is requested for the metrics
Storing Value for Key¶
put(
key: UnsafeRow,
value: UnsafeRow): Unit
Stores (puts) a new non-null
value for a non-null
key
Used when:
StreamingDeduplicateExec
physical operator is executedStreamingGlobalLimitExec
physical operator is executedStateManagerImplBase
is requested to putStateStreamingAggregationStateManagerImplV2
(andStreamingAggregationStateManagerImplV1
) is requested to put a rowStreamingSessionWindowStateManagerImplV1
is requested toputRows
KeyToNumValuesStore
is requested to put the number of values of a keyKeyWithIndexToValueStore
is requested to put a new value of a key
Removing Key¶
remove(
key: UnsafeRow): Unit
Removes a non-null
key
Used when:
WatermarkSupport
physical operator is requested to removeKeysOlderThanWatermarkStateManagerImplBase
is requested to removeStateStreamingAggregationStateManagerBaseImpl
is requested to remove a keyStreamingSessionWindowStateManagerImplV1
is requested toremoveByValueCondition
andputRows
KeyToNumValuesStore
is requested to remove a keyKeyWithIndexToValueStore
is requested to remove a key
Implementations¶
getReadOnly¶
getReadOnly(
storeProviderId: StateStoreProviderId,
keySchema: StructType,
valueSchema: StructType,
numColsPrefixKey: Int,
version: Long,
storeConf: StateStoreConf,
hadoopConf: Configuration): ReadStateStore
getReadOnly
looks up a StateStoreProvider (for the given StateStoreProviderId) to getReadStore for the given version
.
getReadOnly
is used when:
ReadStateStoreRDD
is requested to compute a partition
Looking Up StateStore by Provider ID and Version¶
get(
storeProviderId: StateStoreProviderId,
keySchema: StructType,
valueSchema: StructType,
numColsPrefixKey: Int,
version: Long,
storeConf: StateStoreConf,
hadoopConf: Configuration): StateStore
get
looks up a StateStoreProvider (for the given StateStoreProviderId) to get a StateStore for the given version
.
get
is used when:
FlatMapGroupsWithStateExec
is executedStateStoreRDD
is requested to compute a partitionSymmetricHashJoinStateManager.StateStoreHandler
is requested to look up a StateStore
Looking Up StateStore by Provider ID¶
getStateStoreProvider(
storeProviderId: StateStoreProviderId,
keySchema: StructType,
valueSchema: StructType,
numColsPrefixKey: Int,
storeConf: StateStoreConf,
hadoopConf: Configuration): StateStoreProvider
getStateStoreProvider
start the periodic maintenance task.
Only if the partitionId (of the StateStoreId of the given StateStoreProviderId) is 0
, getStateStoreProvider
validates the state schema.
FIXME Describe the validation
getStateStoreProvider
looks up the StateStoreProvider for the given StateStoreProviderId (in the loadedProviders registry) or creates and initializes a new one.
getStateStoreProvider
collects the other StateStoreProvider (in the loadedProviders registry), reportActiveStoreInstance and unloads them.
Note
There can be one active StateStoreProvider in a Spark executor.
getStateStoreProvider
is a helper method of getReadOnly and get.
Review Me¶
StateStore
supports incremental checkpointing in which only the key-value "Row" pairs that changed are <
StateStore
is identified with the <
[[logging]] [TIP] ==== Enable ALL
logging level for org.apache.spark.sql.execution.streaming.state.StateStore$
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.state.StateStore$=ALL
Refer to <>.¶
=== [[coordinatorRef]] Creating (and Caching) RPC Endpoint Reference to StateStoreCoordinator for Executors
[source, scala]¶
coordinatorRef: Option[StateStoreCoordinatorRef]¶
coordinatorRef
requests the SparkEnv
helper object for the current SparkEnv
.
If the SparkEnv
is available and the <<_coordRef, _coordRef>> is not assigned yet, coordinatorRef
prints out the following DEBUG message to the logs followed by requesting the StateStoreCoordinatorRef
for the StateStoreCoordinator endpoint.
Getting StateStoreCoordinatorRef
If the SparkEnv
is available, coordinatorRef
prints out the following INFO message to the logs:
Retrieved reference to StateStoreCoordinator: [_coordRef]
NOTE: coordinatorRef
is used when StateStore
helper object is requested to <StateStore
object helper is requested to <StateStore
object helper is requested to <
=== [[reportActiveStoreInstance]] Announcing New StateStoreProvider
[source, scala]¶
reportActiveStoreInstance( storeProviderId: StateStoreProviderId): Unit
reportActiveStoreInstance
takes the current host and executorId
(from the BlockManager
on the Spark executor) and requests the <
NOTE: reportActiveStoreInstance
uses SparkEnv
to access the BlockManager
.
In the end, reportActiveStoreInstance
prints out the following INFO message to the logs:
Reported that the loaded instance [storeProviderId] is active
NOTE: reportActiveStoreInstance
is used exclusively when StateStore
utility is requested to <
=== [[MaintenanceTask]] MaintenanceTask
Daemon Thread
MaintenanceTask
is a daemon thread that <
When an error occurs, MaintenanceTask
clears <
MaintenanceTask
is scheduled on state-store-maintenance-task thread pool that runs periodically every spark.sql.streaming.stateStore.maintenanceInterval.
==== [[startMaintenanceIfNeeded]] Starting Periodic Maintenance Task (Unless Already Started) -- startMaintenanceIfNeeded
Internal Object Method
[source, scala]¶
startMaintenanceIfNeeded(): Unit¶
startMaintenanceIfNeeded
schedules <60s
).
NOTE: startMaintenanceIfNeeded
does nothing when the maintenance task has already been started and is still running.
NOTE: startMaintenanceIfNeeded
is used exclusively when StateStore
is requested to <
==== [[doMaintenance]] Doing State Maintenance of Registered State Store Providers -- doMaintenance
Internal Object Method
[source, scala]¶
doMaintenance(): Unit¶
Internally, doMaintenance
prints the following DEBUG message to the logs:
Doing maintenance
doMaintenance
then requests every StateStoreProvider.md[StateStoreProvider] (registered in <StateStoreProvider
<
When a StateStoreProvider
is <doMaintenance
<
Unloaded [provider]
NOTE: doMaintenance
is used exclusively in <