Skip to content

StateStoreHandler

StateStoreHandler is the internal <> of <> that manage a <> (i.e. <>, <> and <>).

[[stateStoreType]] StateStoreHandler takes a single StateStoreType to be created:

  • [[KeyToNumValuesType]] KeyToNumValuesType for <> (alias: keyToNumValues)

  • [[KeyWithIndexToValueType]] KeyWithIndexToValueType for <> (alias: keyWithIndexToValue)

NOTE: StateStoreHandler is a Scala private abstract class and cannot be <> directly. It is created indirectly for the <>.

[[contract]] .StateStoreHandler Contract [cols="1m,2",options="header",width="100%"] |=== | Method | Description

| stateStore a| [[stateStore]]

[source, scala]

stateStore: StateStore

StateStore |===

[[extensions]] .StateStoreHandlers [cols="1,2",options="header",width="100%"] |=== | StateStoreHandler | Description

| <> | [[KeyToNumValuesStore]] StateStoreHandler of <>

| <> | [[KeyWithIndexToValueStore]]

|===

[[logging]] [TIP] ==== Enable ALL logging levels for org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.StateStoreHandler logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.StateStoreHandler=ALL

Refer to <>.

=== [[metrics]] Performance Metrics -- metrics Method

[source, scala]

metrics: StateStoreMetrics

metrics simply requests the StateStore for the StateStoreMetrics.

metrics is used when SymmetricHashJoinStateManager is requested for the metrics.

=== [[commit]] Committing State (Changes to State Store) -- commit Method

[source, scala]

commit(): Unit

commit...FIXME

NOTE: commit is used when...FIXME

=== [[abortIfNeeded]] abortIfNeeded Method

[source, scala]

abortIfNeeded(): Unit

abortIfNeeded...FIXME

NOTE: abortIfNeeded is used when...FIXME

=== [[getStateStore]] Loading State Store (By Key and Value Schemas) -- getStateStore Method

[source, scala]

getStateStore( keySchema: StructType, valueSchema: StructType): StateStore


getStateStore creates a new <> (for the StatefulOperatorStateInfo of the owning SymmetricHashJoinStateManager, the partition ID from the execution context, and the name of the state store for the JoinSide and <>).

getStateStore uses the StateStore utility to look up a StateStore for the StateStoreProviderId.

In the end, getStateStore prints out the following INFO message to the logs:

Loaded store [storeId]

getStateStore is used when KeyToNumValuesStore and <> state store handlers are created (for SymmetricHashJoinStateManager).

=== [[StateStoreType]] StateStoreType Contract (Sealed Trait)

StateStoreType is required to create a <>.

[[StateStoreType-implementations]] .StateStoreTypes [cols="1m,1m,2",options="header",width="100%"] |=== | StateStoreType | toString | Description

| KeyToNumValuesType | keyToNumValues | [[KeyToNumValuesType]]

| KeyWithIndexToValueType | keyWithIndexToValue | [[KeyWithIndexToValueType]] |===

NOTE: StateStoreType is a Scala private sealed trait which means that all the <> are in the same compilation unit (a single file).