StateStore¶
StateStore
is the <
StateStore
supports incremental checkpointing in which only the key-value "Row" pairs that changed are <
StateStore
is identified with the <
[[implementations]] NOTE: HDFSBackedStateStore is the default and only known implementation of the <
[[contract]] .StateStore Contract [cols="30m,70",options="header",width="100%"] |=== | Method | Description
| abort a| [[abort]]
[source, scala]¶
abort(): Unit¶
Aborts (discards) changes to the state store
Used when:
-
StateStoreOps
implicit class is requested to mapPartitionsWithStateStore (when the state store has not been <> for a task that finishes, possibly with an error) -
StateStoreHandler
(of SymmetricHashJoinStateManager) is requested to abortIfNeeded (when the state store has not been <> for a task that finishes, possibly with an error)
| commit a| [[commit]]
[source, scala]¶
commit(): Long¶
Commits the changes to the state store (and returns the current version)
Used when:
-
FlatMapGroupsWithStateExec, StreamingDeduplicateExec and StreamingGlobalLimitExec physical operators are executed (right after all rows in a partition have been processed)
-
StreamingAggregationStateManagerBaseImpl
is requested to commit (changes to) a state store (when StateStoreSaveExec physical operator is executed) -
StateStoreHandler
(of SymmetricHashJoinStateManager) is requested to commit changes to a state store
| get a| [[get]]
get(
key: UnsafeRow): UnsafeRow
Looks up (gets) the value of the given non-null
key
Used when:
-
StreamingDeduplicateExec and StreamingGlobalLimitExec physical operators are executed
-
StateManagerImplBase
(ofFlatMapGroupsWithStateExecHelper
) is requested togetState
-
StreamingAggregationStateManagerImplV1 and StreamingAggregationStateManagerImplV2 are requested to get the value of a non-null key
-
KeyToNumValuesStore
is requested to get
| getRange a| [[getRange]]
[source, scala]¶
getRange( start: Option[UnsafeRow], end: Option[UnsafeRow]): Iterator[UnsafeRowPair]
Gets the key-value pairs of UnsafeRows
for the specified range (with optional approximate start
and end
extents)
Used when:
-
WatermarkSupport
is requested to removeKeysOlderThanWatermark -
StateManagerImplBase
is requested togetAllState
-
StreamingAggregationStateManagerBaseImpl
is requested for keys -
KeyToNumValuesStore and KeyWithIndexToValueStore are requested to
iterator
NOTE: All the uses above assume the start
and end
as None
that basically is <
| hasCommitted a| [[hasCommitted]]
[source, scala]¶
hasCommitted: Boolean¶
Flag to indicate whether state changes have been committed (true
) or not (false
)
Used when:
-
RDD
(viaStateStoreOps
implicit class) is requested to mapPartitionsWithStateStore (and a task finishes and may need to <>) -
SymmetricHashJoinStateManager
is requested to abortIfNeeded (when a task finishes and may need to <>))
| id a| [[id]]
[source, scala]¶
id: StateStoreId¶
The <
Used when:
-
HDFSBackedStateStore
state store is requested for the textual representation -
StateStoreHandler
(of SymmetricHashJoinStateManager) is requested to <> and < >
| iterator a| [[iterator]]
[source, scala]¶
iterator(): Iterator[UnsafeRowPair]¶
Returns an iterator with all the kay-value pairs in the state store
Used when:
-
StateStoreRestoreExec physical operator is requested to execute
-
HDFSBackedStateStore state store in particular and any StateStore in general are requested to
getRange
-
StreamingAggregationStateManagerImplV1
state manager is requested for the iterator and values -
StreamingAggregationStateManagerImplV2
state manager is requested to iterator and values
| metrics a| [[metrics]]
[source, scala]¶
metrics: StateStoreMetrics¶
StateStoreMetrics of the state store
Used when:
-
StateStoreWriter
stateful physical operator is requested to setStoreMetrics -
StateStoreHandler
(of SymmetricHashJoinStateManager) is requested to commit and for the metrics
| put a| [[put]]
[source, scala]¶
put( key: UnsafeRow, value: UnsafeRow): Unit
Stores (puts) the value for the (non-null) key
Used when:
-
StreamingDeduplicateExec and StreamingGlobalLimitExec physical operators are executed
-
StateManagerImplBase
is requested toputState
-
StreamingAggregationStateManagerImplV1 and StreamingAggregationStateManagerImplV2 are requested to store a row in a state store
-
KeyToNumValuesStore and KeyWithIndexToValueStore are requested to store a new value for a given key
| remove a| [[remove]]
[source, scala]¶
remove(key: UnsafeRow): Unit¶
Removes the (non-null) key from the state store
Used when:
-
Physical operators with
WatermarkSupport
are requested to removeKeysOlderThanWatermark -
StateManagerImplBase
is requested toremoveState
-
StreamingAggregationStateManagerBaseImpl
is requested to remove a key from a state store -
KeyToNumValuesStore
is requested to remove a key -
KeyWithIndexToValueStore
is requested to <> and < >
| version a| [[version]]
[source, scala]¶
version: Long¶
Version of the state store
Used exclusively when HDFSBackedStateStore
state store is requested for a new version (that simply the current version incremented)
|===
[NOTE]¶
StateStore
was introduced in https://github.com/apache/spark/commit/8c826880f5eaa3221c4e9e7d3fece54e821a0b98[[SPARK-13809][SQL] State store for streaming aggregations].
Read the motivation and design in https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254/edit[State Store for Streaming Aggregations].¶
[[logging]] [TIP] ==== Enable ALL
logging level for org.apache.spark.sql.execution.streaming.state.StateStore$
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.state.StateStore$=ALL
Refer to <>.¶
=== [[coordinatorRef]] Creating (and Caching) RPC Endpoint Reference to StateStoreCoordinator for Executors -- coordinatorRef
Internal Object Method
[source, scala]¶
coordinatorRef: Option[StateStoreCoordinatorRef]¶
coordinatorRef
requests the SparkEnv
helper object for the current SparkEnv
.
If the SparkEnv
is available and the <<_coordRef, _coordRef>> is not assigned yet, coordinatorRef
prints out the following DEBUG message to the logs followed by requesting the StateStoreCoordinatorRef
for the StateStoreCoordinator endpoint.
Getting StateStoreCoordinatorRef
If the SparkEnv
is available, coordinatorRef
prints out the following INFO message to the logs:
Retrieved reference to StateStoreCoordinator: [_coordRef]
NOTE: coordinatorRef
is used when StateStore
helper object is requested to <StateStore
object helper is requested to <StateStore
object helper is requested to <
=== [[unload]] Unloading State Store Provider -- unload
Method
[source, scala]¶
unload(storeProviderId: StateStoreProviderId): Unit¶
unload
...FIXME
NOTE: unload
is used when StateStore
helper object is requested to <
=== [[stop]] stop
Object Method
[source, scala]¶
stop(): Unit¶
stop
...FIXME
NOTE: stop
seems only be used in tests.
=== [[reportActiveStoreInstance]] Announcing New StateStoreProvider -- reportActiveStoreInstance
Internal Object Method
[source, scala]¶
reportActiveStoreInstance( storeProviderId: StateStoreProviderId): Unit
reportActiveStoreInstance
takes the current host and executorId
(from the BlockManager
on the Spark executor) and requests the <
NOTE: reportActiveStoreInstance
uses SparkEnv
to access the BlockManager
.
In the end, reportActiveStoreInstance
prints out the following INFO message to the logs:
Reported that the loaded instance [storeProviderId] is active
NOTE: reportActiveStoreInstance
is used exclusively when StateStore
utility is requested to <
=== [[MaintenanceTask]] MaintenanceTask
Daemon Thread
MaintenanceTask
is a daemon thread that <
When an error occurs, MaintenanceTask
clears <
MaintenanceTask
is scheduled on state-store-maintenance-task thread pool that runs periodically every spark.sql.streaming.stateStore.maintenanceInterval.
Looking Up StateStore by Provider ID¶
get(
storeProviderId: StateStoreProviderId,
keySchema: StructType,
valueSchema: StructType,
indexOrdinal: Option[Int],
version: Long,
storeConf: StateStoreConf,
hadoopConf: Configuration): StateStore
get
finds StateStore
for the specified StateStoreProviderId and version.
NOTE: The version is either the <
Internally, get
looks up the <storeProviderId
) in the <get
uses the StateStoreProvider
utility to <
get
will also <
In the end, get
requests the StateStoreProvider
to <
get
is used when:
-
StateStoreRDD
is requested to compute a partition -
StateStoreHandler
(of SymmetricHashJoinStateManager) is requested to <>
==== [[startMaintenanceIfNeeded]] Starting Periodic Maintenance Task (Unless Already Started) -- startMaintenanceIfNeeded
Internal Object Method
[source, scala]¶
startMaintenanceIfNeeded(): Unit¶
startMaintenanceIfNeeded
schedules <60s
).
NOTE: startMaintenanceIfNeeded
does nothing when the maintenance task has already been started and is still running.
NOTE: startMaintenanceIfNeeded
is used exclusively when StateStore
is requested to <
==== [[doMaintenance]] Doing State Maintenance of Registered State Store Providers -- doMaintenance
Internal Object Method
[source, scala]¶
doMaintenance(): Unit¶
Internally, doMaintenance
prints the following DEBUG message to the logs:
Doing maintenance
doMaintenance
then requests every spark-sql-streaming-StateStoreProvider.md[StateStoreProvider] (registered in <StateStoreProvider
<
When a StateStoreProvider
is <doMaintenance
<
Unloaded [provider]
NOTE: doMaintenance
is used exclusively in <
==== [[verifyIfStoreInstanceActive]] verifyIfStoreInstanceActive
Internal Object Method
[source, scala]¶
verifyIfStoreInstanceActive(storeProviderId: StateStoreProviderId): Boolean¶
verifyIfStoreInstanceActive
...FIXME
NOTE: verifyIfStoreInstanceActive
is used exclusively when StateStore
helper object is requested to <
=== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| loadedProviders | [[loadedProviders]] Loaded providers internal cache, i.e. <
Used in...FIXME
| _coordRef | [[_coordRef]] StateStoreCoordinator RPC endpoint (a RpcEndpointRef
to StateStoreCoordinator)
Used in...FIXME |===