Skip to content

StateStoreProvider

StateStoreProvider is an abstraction of StateStore providers that manage state stores (state data) in Stateful Stream Processing.

A concrete StateStoreProvider is selected based on spark.sql.streaming.stateStore.providerClass configuration property.

Contract

Looking Up StateStore by Version

getStore(
  version: Long): StateStore

StateStore for the given version

See:

Used when:

Initialization

init(
  stateStoreId: StateStoreId,
  keySchema: StructType,
  valueSchema: StructType,
  numColsPrefixKey: Int,
  storeConfs: StateStoreConf,
  hadoopConf: Configuration): Unit

Initializes a StateStoreProvider with the following:

numColsPrefixKey

The input numColsPrefixKey is different based on a stateful operator:

Stateful Operator numColsPrefixKey
FlatMapGroupsWithStateExec 0
StateStoreRDD numColsPrefixKey
SymmetricHashJoinStateManager.StateStoreHandler 0

See:

Used when:

Supported Custom Metrics

supportedCustomMetrics: Seq[StateStoreCustomMetric]

StateStoreCustomMetrics (e.g., to report in web UI or StateOperatorProgress)

Default: empty

See:

Used when:

Implementations

Creating StateStoreProvider

create(
  providerClassName: String): StateStoreProvider

create creates a StateStoreProvider based on the fully-qualified class name (providerClassName).


create is used when:

Creating and Initializing StateStoreProvider

createAndInit(
  providerId: StateStoreProviderId,
  keySchema: StructType,
  valueSchema: StructType,
  numColsPrefixKey: Int,
  storeConf: StateStoreConf,
  hadoopConf: Configuration): StateStoreProvider

createAndInit creates a StateStoreProvider based on spark.sql.streaming.stateStore.providerClass.

In the end, createAndInit requests the StateStoreProvider to initialize.


createAndInit is used when:

Looking Up ReadStateStore by Version

getReadStore(
  version: Long): ReadStateStore

getReadStore creates a WrappedReadStateStore for the StateStore for the given version.


getReadStore is used when: