StreamingAggregationStateManager¶
StreamingAggregationStateManager
is an abstraction of state managers for the physical operators used in Streaming Aggregation (e.g., StateStoreSaveExec and StateStoreRestoreExec).
Contract¶
Committing State Changes¶
commit(
store: StateStore): Long
Commits all state changes (updates) to the given StateStore and returns a new version
See StreamingAggregationStateManagerBaseImpl
Used when:
StateStoreSaveExec
physical operator is executed
Retrieving Value for Key (from ReadStateStore)¶
get(
store: ReadStateStore,
key: UnsafeRow): UnsafeRow
Retrieves (gets) the current value for a given non-null
key from ReadStateStore
See StreamingAggregationStateManagerImplV2
Used when:
StateStoreRestoreExec
physical operator is executed
Extracting Key¶
getKey(
row: UnsafeRow): UnsafeRow
Extracts the columns of a key from the given row
See StreamingAggregationStateManagerBaseImpl
Used when:
StateStoreRestoreExec
physical operator is executed
All Aggregates by Grouping Keys¶
iterator(
store: ReadStateStore): Iterator[UnsafeRowPair] // (1)!
UnsafeRowPair(var key: UnsafeRow = null, var value: UnsafeRow = null)
Lazy collection (Iterator
) of all the aggregates by grouping keys (state pairs) in the given ReadStateStore
See StreamingAggregationStateManagerImplV2
Used when:
Storing New Value for Key¶
put(
store: StateStore,
row: UnsafeRow): Unit
Stores (puts) a new value for a non-null
key to the StateStore. The key and the value are part of the given row
. The key is extracted using getKey.
See StreamingAggregationStateManagerImplV2
Used when:
StateStoreSaveExec
physical operator is executed
Removing Key¶
remove(
store: StateStore,
key: UnsafeRow): Unit
Removes a non-null
key from the StateStore
See StreamingAggregationStateManagerBaseImpl
Used when:
WatermarkSupport
physical operator is requested to removeKeysOlderThanWatermarkStateStoreSaveExec
physical operator is executed
Implementations¶
Sealed Trait
StreamingAggregationStateManager
is a Scala sealed trait which means that all of the implementations are in the same compilation unit (a single file).
Learn more in the Scala Language Specification.
Creating StreamingAggregationStateManager¶
createStateManager(
keyExpressions: Seq[Attribute],
inputRowAttributes: Seq[Attribute],
stateFormatVersion: Int): StreamingAggregationStateManager
createStateManager
creates a StreamingAggregationStateManager
based on the given stateFormatVersion
(based on spark.sql.streaming.aggregation.stateFormatVersion).
stateFormatVersion | StreamingAggregationStateManager |
---|---|
1 | StreamingAggregationStateManagerImplV1 |
2 | StreamingAggregationStateManagerImplV2 |
createStateManager
is used when: