StateStore¶
StateStore is an extension of the ReadStateStore abstraction for versioned key-value stores for writing and reading state for Stateful Stream Processing (e.g., for persisting running aggregates in Streaming Aggregation).
Note
StateStore was introduced in [SPARK-13809][SQL] State store for streaming aggregations.
Read the motivation and design in State Store for Streaming Aggregations.
Contract¶
Committing State Changes¶
commit(): Long
Commits all updates (puts and removes) and returns a new version
See:
Used when:
FlatMapGroupsWithStateExecphysical operator is requested to processDataWithPartitionSessionWindowStateStoreSaveExecphysical operator is executedStreamingDeduplicateExecphysical operator is executedStreamingGlobalLimitExecphysical operator is executedStreamingAggregationStateManagerBaseImplis requested to commitStreamingSessionWindowStateManagerImplV1is requested tocommitStateStoreHandleris requested to commit
StateStoreMetrics¶
metrics: StateStoreMetrics
StateStoreMetrics of this state store
Used when:
StateStoreWriterphysical operator is requested to setStoreMetricsStateStoreHandleris requested for the metrics
Storing Value for Key¶
put(
key: UnsafeRow,
value: UnsafeRow): Unit
Stores (puts) a new non-null value for a non-null key
Used when:
StreamingDeduplicateExecphysical operator is executedStreamingGlobalLimitExecphysical operator is executedStateManagerImplBaseis requested to putStateStreamingAggregationStateManagerImplV2(andStreamingAggregationStateManagerImplV1) is requested to put a rowStreamingSessionWindowStateManagerImplV1is requested toputRowsKeyToNumValuesStoreis requested to put the number of values of a keyKeyWithIndexToValueStoreis requested to put a new value of a key
Removing Key¶
remove(
key: UnsafeRow): Unit
Removes a non-null key
Used when:
WatermarkSupportphysical operator is requested to removeKeysOlderThanWatermarkStateManagerImplBaseis requested to removeStateStreamingAggregationStateManagerBaseImplis requested to remove a keyStreamingSessionWindowStateManagerImplV1is requested toremoveByValueConditionandputRowsKeyToNumValuesStoreis requested to remove a keyKeyWithIndexToValueStoreis requested to remove a key
Implementations¶
getReadOnly¶
getReadOnly(
storeProviderId: StateStoreProviderId,
keySchema: StructType,
valueSchema: StructType,
numColsPrefixKey: Int,
version: Long,
storeConf: StateStoreConf,
hadoopConf: Configuration): ReadStateStore
getReadOnly looks up a StateStoreProvider (for the given StateStoreProviderId) to getReadStore for the given version.
getReadOnly is used when:
ReadStateStoreRDDis requested to compute a partition
Looking Up StateStore by Provider ID and Version¶
get(
storeProviderId: StateStoreProviderId,
keySchema: StructType,
valueSchema: StructType,
numColsPrefixKey: Int,
version: Long,
storeConf: StateStoreConf,
hadoopConf: Configuration): StateStore
get looks up a StateStoreProvider (for the given StateStoreProviderId) to get a StateStore for the given version.
get is used when:
FlatMapGroupsWithStateExecis executedStateStoreRDDis requested to compute a partitionSymmetricHashJoinStateManager.StateStoreHandleris requested to look up a StateStore
Looking Up StateStore by Provider ID¶
getStateStoreProvider(
storeProviderId: StateStoreProviderId,
keySchema: StructType,
valueSchema: StructType,
numColsPrefixKey: Int,
storeConf: StateStoreConf,
hadoopConf: Configuration): StateStoreProvider
getStateStoreProvider start the periodic maintenance task.
Only if the partitionId (of the StateStoreId of the given StateStoreProviderId) is 0, getStateStoreProvider validates the state schema.
FIXME Describe the validation
getStateStoreProvider looks up the StateStoreProvider for the given StateStoreProviderId (in the loadedProviders registry) or creates and initializes a new one.
getStateStoreProvider collects the other StateStoreProvider (in the loadedProviders registry), reportActiveStoreInstance and unloads them.
Note
There can be one active StateStoreProvider in a Spark executor.
getStateStoreProvider is a helper method of getReadOnly and get.
Review Me¶
StateStore supports incremental checkpointing in which only the key-value "Row" pairs that changed are <
StateStore is identified with the <
[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.execution.streaming.state.StateStore$ logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.execution.streaming.state.StateStore$=ALL
Refer to <>.¶
=== [[coordinatorRef]] Creating (and Caching) RPC Endpoint Reference to StateStoreCoordinator for Executors
[source, scala]¶
coordinatorRef: Option[StateStoreCoordinatorRef]¶
coordinatorRef requests the SparkEnv helper object for the current SparkEnv.
If the SparkEnv is available and the <<_coordRef, _coordRef>> is not assigned yet, coordinatorRef prints out the following DEBUG message to the logs followed by requesting the StateStoreCoordinatorRef for the StateStoreCoordinator endpoint.
Getting StateStoreCoordinatorRef
If the SparkEnv is available, coordinatorRef prints out the following INFO message to the logs:
Retrieved reference to StateStoreCoordinator: [_coordRef]
NOTE: coordinatorRef is used when StateStore helper object is requested to <StateStore object helper is requested to <StateStore object helper is requested to <
=== [[reportActiveStoreInstance]] Announcing New StateStoreProvider
[source, scala]¶
reportActiveStoreInstance( storeProviderId: StateStoreProviderId): Unit
reportActiveStoreInstance takes the current host and executorId (from the BlockManager on the Spark executor) and requests the <
NOTE: reportActiveStoreInstance uses SparkEnv to access the BlockManager.
In the end, reportActiveStoreInstance prints out the following INFO message to the logs:
Reported that the loaded instance [storeProviderId] is active
NOTE: reportActiveStoreInstance is used exclusively when StateStore utility is requested to <
=== [[MaintenanceTask]] MaintenanceTask Daemon Thread
MaintenanceTask is a daemon thread that <
When an error occurs, MaintenanceTask clears <
MaintenanceTask is scheduled on state-store-maintenance-task thread pool that runs periodically every spark.sql.streaming.stateStore.maintenanceInterval.
==== [[startMaintenanceIfNeeded]] Starting Periodic Maintenance Task (Unless Already Started) -- startMaintenanceIfNeeded Internal Object Method
[source, scala]¶
startMaintenanceIfNeeded(): Unit¶
startMaintenanceIfNeeded schedules <60s).
NOTE: startMaintenanceIfNeeded does nothing when the maintenance task has already been started and is still running.
NOTE: startMaintenanceIfNeeded is used exclusively when StateStore is requested to <
==== [[doMaintenance]] Doing State Maintenance of Registered State Store Providers -- doMaintenance Internal Object Method
[source, scala]¶
doMaintenance(): Unit¶
Internally, doMaintenance prints the following DEBUG message to the logs:
Doing maintenance
doMaintenance then requests every StateStoreProvider.md[StateStoreProvider] (registered in <StateStoreProvider <
When a StateStoreProvider is <doMaintenance <
Unloaded [provider]
NOTE: doMaintenance is used exclusively in <