Skip to content

StreamingAggregationStateManager

StreamingAggregationStateManager is an abstraction of state managers for the physical operators used in Streaming Aggregation (e.g., StateStoreSaveExec and StateStoreRestoreExec).

Contract

Committing State Changes

commit(
  store: StateStore): Long

Commits all state changes (updates) to the given StateStore and returns a new version

See StreamingAggregationStateManagerBaseImpl

Used when:

  • StateStoreSaveExec physical operator is executed

Retrieving Value for Key (from ReadStateStore)

get(
  store: ReadStateStore,
  key: UnsafeRow): UnsafeRow

Retrieves (gets) the current value for a given non-null key from ReadStateStore

See StreamingAggregationStateManagerImplV2

Used when:

  • StateStoreRestoreExec physical operator is executed

Extracting Key

getKey(
  row: UnsafeRow): UnsafeRow

Extracts the columns of a key from the given row

See StreamingAggregationStateManagerBaseImpl

Used when:

  • StateStoreRestoreExec physical operator is executed

All Aggregates by Grouping Keys

iterator(
  store: ReadStateStore): Iterator[UnsafeRowPair] // (1)!
  1. UnsafeRowPair(var key: UnsafeRow = null, var value: UnsafeRow = null)

Lazy collection (Iterator) of all the aggregates by grouping keys (state pairs) in the given ReadStateStore

See StreamingAggregationStateManagerImplV2

Used when:

  • StateStoreSaveExec physical operator is executed (for Append output mode)

Storing New Value for Key

put(
  store: StateStore,
  row: UnsafeRow): Unit

Stores (puts) a new value for a non-null key to the StateStore. The key and the value are part of the given row. The key is extracted using getKey.

See StreamingAggregationStateManagerImplV2

Used when:

  • StateStoreSaveExec physical operator is executed

Removing Key

remove(
  store: StateStore,
  key: UnsafeRow): Unit

Removes a non-null key from the StateStore

See StreamingAggregationStateManagerBaseImpl

Used when:

Implementations

Sealed Trait

StreamingAggregationStateManager is a Scala sealed trait which means that all of the implementations are in the same compilation unit (a single file).

Learn more in the Scala Language Specification.

Creating StreamingAggregationStateManager

createStateManager(
  keyExpressions: Seq[Attribute],
  inputRowAttributes: Seq[Attribute],
  stateFormatVersion: Int): StreamingAggregationStateManager

createStateManager creates a StreamingAggregationStateManager based on the given stateFormatVersion (based on spark.sql.streaming.aggregation.stateFormatVersion).

stateFormatVersion StreamingAggregationStateManager
1 StreamingAggregationStateManagerImplV1
2 StreamingAggregationStateManagerImplV2

createStateManager is used when:

  • StateStoreRestoreExec physical operator is created
  • StateStoreSaveExec physical operator is created