Skip to content

StateStore

StateStore is an extension of the ReadStateStore abstraction for versioned key-value stores for writing and reading state for Stateful Stream Processing (e.g., for persisting running aggregates in Streaming Aggregation).

Note

StateStore was introduced in [SPARK-13809][SQL] State store for streaming aggregations.

Read the motivation and design in State Store for Streaming Aggregations.

Contract

Committing State Changes

commit(): Long

Commits all updates (puts and removes) and returns a new version

See:

Used when:

  • FlatMapGroupsWithStateExec physical operator is requested to processDataWithPartition
  • SessionWindowStateStoreSaveExec physical operator is executed
  • StreamingDeduplicateExec physical operator is executed
  • StreamingGlobalLimitExec physical operator is executed
  • StreamingAggregationStateManagerBaseImpl is requested to commit
  • StreamingSessionWindowStateManagerImplV1 is requested to commit
  • StateStoreHandler is requested to commit

StateStoreMetrics

metrics: StateStoreMetrics

StateStoreMetrics of this state store

Used when:

  • StateStoreWriter physical operator is requested to setStoreMetrics
  • StateStoreHandler is requested for the metrics

Storing Value for Key

put(
  key: UnsafeRow,
  value: UnsafeRow): Unit

Stores (puts) a new non-null value for a non-null key

Used when:

  • StreamingDeduplicateExec physical operator is executed
  • StreamingGlobalLimitExec physical operator is executed
  • StateManagerImplBase is requested to putState
  • StreamingAggregationStateManagerImplV2(and StreamingAggregationStateManagerImplV1) is requested to put a row
  • StreamingSessionWindowStateManagerImplV1 is requested to putRows
  • KeyToNumValuesStore is requested to put the number of values of a key
  • KeyWithIndexToValueStore is requested to put a new value of a key

Removing Key

remove(
  key: UnsafeRow): Unit

Removes a non-null key

Used when:

  • WatermarkSupport physical operator is requested to removeKeysOlderThanWatermark
  • StateManagerImplBase is requested to removeState
  • StreamingAggregationStateManagerBaseImpl is requested to remove a key
  • StreamingSessionWindowStateManagerImplV1 is requested to removeByValueCondition and putRows
  • KeyToNumValuesStore is requested to remove a key
  • KeyWithIndexToValueStore is requested to remove a key

Implementations

getReadOnly

getReadOnly(
  storeProviderId: StateStoreProviderId,
  keySchema: StructType,
  valueSchema: StructType,
  numColsPrefixKey: Int,
  version: Long,
  storeConf: StateStoreConf,
  hadoopConf: Configuration): ReadStateStore

getReadOnly looks up a StateStoreProvider (for the given StateStoreProviderId) to getReadStore for the given version.


getReadOnly is used when:

Looking Up StateStore by Provider ID and Version

get(
  storeProviderId: StateStoreProviderId,
  keySchema: StructType,
  valueSchema: StructType,
  numColsPrefixKey: Int,
  version: Long,
  storeConf: StateStoreConf,
  hadoopConf: Configuration): StateStore

get looks up a StateStoreProvider (for the given StateStoreProviderId) to get a StateStore for the given version.


get is used when:

Looking Up StateStore by Provider ID

getStateStoreProvider(
  storeProviderId: StateStoreProviderId,
  keySchema: StructType,
  valueSchema: StructType,
  numColsPrefixKey: Int,
  storeConf: StateStoreConf,
  hadoopConf: Configuration): StateStoreProvider

getStateStoreProvider start the periodic maintenance task.

Only if the partitionId (of the StateStoreId of the given StateStoreProviderId) is 0, getStateStoreProvider validates the state schema.

FIXME Describe the validation

getStateStoreProvider looks up the StateStoreProvider for the given StateStoreProviderId (in the loadedProviders registry) or creates and initializes a new one.

getStateStoreProvider collects the other StateStoreProvider (in the loadedProviders registry), reportActiveStoreInstance and unloads them.

Note

There can be one active StateStoreProvider in a Spark executor.


getStateStoreProvider is a helper method of getReadOnly and get.

Review Me

StateStore supports incremental checkpointing in which only the key-value "Row" pairs that changed are <> or <> (without touching other key-value pairs).

StateStore is identified with the <> (among other properties for identification).

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.execution.streaming.state.StateStore$ logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.state.StateStore$=ALL

Refer to <>.

=== [[coordinatorRef]] Creating (and Caching) RPC Endpoint Reference to StateStoreCoordinator for Executors

[source, scala]

coordinatorRef: Option[StateStoreCoordinatorRef]

coordinatorRef requests the SparkEnv helper object for the current SparkEnv.

If the SparkEnv is available and the <<_coordRef, _coordRef>> is not assigned yet, coordinatorRef prints out the following DEBUG message to the logs followed by requesting the StateStoreCoordinatorRef for the StateStoreCoordinator endpoint.

Getting StateStoreCoordinatorRef

If the SparkEnv is available, coordinatorRef prints out the following INFO message to the logs:

Retrieved reference to StateStoreCoordinator: [_coordRef]

NOTE: coordinatorRef is used when StateStore helper object is requested to <> (when StateStore object helper is requested to <>) and <> (when StateStore object helper is requested to <>).

=== [[reportActiveStoreInstance]] Announcing New StateStoreProvider

[source, scala]

reportActiveStoreInstance( storeProviderId: StateStoreProviderId): Unit


reportActiveStoreInstance takes the current host and executorId (from the BlockManager on the Spark executor) and requests the <> to reportActiveInstance.

NOTE: reportActiveStoreInstance uses SparkEnv to access the BlockManager.

In the end, reportActiveStoreInstance prints out the following INFO message to the logs:

Reported that the loaded instance [storeProviderId] is active

NOTE: reportActiveStoreInstance is used exclusively when StateStore utility is requested to <>.

=== [[MaintenanceTask]] MaintenanceTask Daemon Thread

MaintenanceTask is a daemon thread that <>.

When an error occurs, MaintenanceTask clears <> internal registry.

MaintenanceTask is scheduled on state-store-maintenance-task thread pool that runs periodically every spark.sql.streaming.stateStore.maintenanceInterval.

==== [[startMaintenanceIfNeeded]] Starting Periodic Maintenance Task (Unless Already Started) -- startMaintenanceIfNeeded Internal Object Method

[source, scala]

startMaintenanceIfNeeded(): Unit

startMaintenanceIfNeeded schedules <> to start after and every spark.sql.streaming.stateStore.maintenanceInterval (defaults to 60s).

NOTE: startMaintenanceIfNeeded does nothing when the maintenance task has already been started and is still running.

NOTE: startMaintenanceIfNeeded is used exclusively when StateStore is requested to <>.

==== [[doMaintenance]] Doing State Maintenance of Registered State Store Providers -- doMaintenance Internal Object Method

[source, scala]

doMaintenance(): Unit

Internally, doMaintenance prints the following DEBUG message to the logs:

Doing maintenance

doMaintenance then requests every StateStoreProvider.md[StateStoreProvider] (registered in <>) to StateStoreProvider.md#doMaintenance[do its own internal maintenance] (only when a StateStoreProvider <>).

When a StateStoreProvider is <>, doMaintenance <> and prints the following INFO message to the logs:

Unloaded [provider]

NOTE: doMaintenance is used exclusively in <>.