StreamingAggregationStateManager¶
StreamingAggregationStateManager is an abstraction of state managers for the physical operators used in Streaming Aggregation (e.g., StateStoreSaveExec and StateStoreRestoreExec).
Contract¶
Committing State Changes¶
commit(
store: StateStore): Long
Commits all state changes (updates) to the given StateStore and returns a new version
See StreamingAggregationStateManagerBaseImpl
Used when:
StateStoreSaveExecphysical operator is executed
Retrieving Value for Key (from ReadStateStore)¶
get(
store: ReadStateStore,
key: UnsafeRow): UnsafeRow
Retrieves (gets) the current value for a given non-null key from ReadStateStore
See StreamingAggregationStateManagerImplV2
Used when:
StateStoreRestoreExecphysical operator is executed
Extracting Key¶
getKey(
row: UnsafeRow): UnsafeRow
Extracts the columns of a key from the given row
See StreamingAggregationStateManagerBaseImpl
Used when:
StateStoreRestoreExecphysical operator is executed
All Aggregates by Grouping Keys¶
iterator(
store: ReadStateStore): Iterator[UnsafeRowPair] // (1)!
UnsafeRowPair(var key: UnsafeRow = null, var value: UnsafeRow = null)
Lazy collection (Iterator) of all the aggregates by grouping keys (state pairs) in the given ReadStateStore
See StreamingAggregationStateManagerImplV2
Used when:
Storing New Value for Key¶
put(
store: StateStore,
row: UnsafeRow): Unit
Stores (puts) a new value for a non-null key to the StateStore. The key and the value are part of the given row. The key is extracted using getKey.
See StreamingAggregationStateManagerImplV2
Used when:
StateStoreSaveExecphysical operator is executed
Removing Key¶
remove(
store: StateStore,
key: UnsafeRow): Unit
Removes a non-null key from the StateStore
See StreamingAggregationStateManagerBaseImpl
Used when:
WatermarkSupportphysical operator is requested to removeKeysOlderThanWatermarkStateStoreSaveExecphysical operator is executed
Implementations¶
Sealed Trait
StreamingAggregationStateManager is a Scala sealed trait which means that all of the implementations are in the same compilation unit (a single file).
Learn more in the Scala Language Specification.
Creating StreamingAggregationStateManager¶
createStateManager(
keyExpressions: Seq[Attribute],
inputRowAttributes: Seq[Attribute],
stateFormatVersion: Int): StreamingAggregationStateManager
createStateManager creates a StreamingAggregationStateManager based on the given stateFormatVersion (based on spark.sql.streaming.aggregation.stateFormatVersion).
| stateFormatVersion | StreamingAggregationStateManager |
|---|---|
| 1 | StreamingAggregationStateManagerImplV1 |
| 2 | StreamingAggregationStateManagerImplV2 |
createStateManager is used when: