StateStoreProvider¶
StateStoreProvider is an abstraction of StateStore providers that manage state stores (state data) in Stateful Stream Processing.
A concrete StateStoreProvider is selected based on spark.sql.streaming.stateStore.providerClass configuration property.
Contract¶
Looking Up StateStore by Version¶
getStore(
version: Long): StateStore
StateStore for the given version
See:
Used when:
StateStoreProvideris requested for a ReadStateStore by versionStateStoreutility is used to get a StateStore by provider id and version
Initialization¶
init(
stateStoreId: StateStoreId,
keySchema: StructType,
valueSchema: StructType,
numColsPrefixKey: Int,
storeConfs: StateStoreConf,
hadoopConf: Configuration): Unit
Initializes a StateStoreProvider with the following:
- StateStoreId
- Key and value schema
- StateStoreConf
numColsPrefixKey
The input numColsPrefixKey is different based on a stateful operator:
| Stateful Operator | numColsPrefixKey |
|---|---|
| FlatMapGroupsWithStateExec | 0 |
| StateStoreRDD | numColsPrefixKey |
| SymmetricHashJoinStateManager.StateStoreHandler | 0 |
See:
Used when:
StateStoreProviderutility is used to create and initialize a StateStoreProvider
Supported Custom Metrics¶
supportedCustomMetrics: Seq[StateStoreCustomMetric]
StateStoreCustomMetrics (e.g., to report in web UI or StateOperatorProgress)
Default: empty
See:
Used when:
StateStoreWriteris requested for the stateStoreCustomMetrics
Implementations¶
Creating StateStoreProvider¶
create(
providerClassName: String): StateStoreProvider
create creates a StateStoreProvider based on the fully-qualified class name (providerClassName).
create is used when:
StateStoreWriterphysical operator is requested to stateStoreCustomMetricsStateStoreProviderutility is used to create and initialize a StateStoreProviderStreamingQueryStatisticsPageis requested for the supportedCustomMetrics
Creating and Initializing StateStoreProvider¶
createAndInit(
providerId: StateStoreProviderId,
keySchema: StructType,
valueSchema: StructType,
numColsPrefixKey: Int,
storeConf: StateStoreConf,
hadoopConf: Configuration): StateStoreProvider
createAndInit creates a StateStoreProvider based on spark.sql.streaming.stateStore.providerClass.
In the end, createAndInit requests the StateStoreProvider to initialize.
createAndInit is used when:
StateStoreutility is used to look up a StateStore by StateStoreProviderId
Looking Up ReadStateStore by Version¶
getReadStore(
version: Long): ReadStateStore
getReadStore creates a WrappedReadStateStore for the StateStore for the given version.
getReadStore is used when:
StateStoreutility is used to get a ReadStateStore