Skip to content

StateStoreSaveExec Physical Operator

StateStoreSaveExec is a unary physical operator (Spark SQL) that saves (writes) a streaming state (to a state store) with support for streaming watermark.

StateStoreSaveExec is one of the physical operators used for streaming aggregation.

StateStoreSaveExec and StatefulAggregationStrategy

Creating Instance

StateStoreSaveExec takes the following to be created:

StateStoreSaveExec is created when:

OutputMode

outputMode: Option[OutputMode]

StateStoreSaveExec can be given an OutputMode when created.

StateStoreSaveExec supports all three OutputModes when executed:

For Append and Update output modes with event-time watermark defined, StateStoreSaveExec can run another batch if the watermark has advanced.

OutputMode is undefined (None) by default and when AggUtils is requested to planStreamingAggregation (when StatefulAggregationStrategy execution planning strategy is requested to plan a streaming aggregation).

OutputMode is required for executing StateStoreSaveExec. It is specified to be the OutputMode of the IncrementalExecution (when state preparation rule is executed and fills in the execution-specific configuration).

Short Name

shortName: String

shortName is part of the StateStoreWriter abstraction.


shortName is the following text:

stateStoreSave

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan (Spark SQL) abstraction.


doExecute creates the metrics (to happen on the driver).

doExecute executes the child physical operator (that produces a RDD[InternalRow]) and creates a StateStoreRDD with the following:

mapPartitionsWithStateStore Value
stateInfo getStateInfo
keySchema Grouping Keys
valueSchema State Value Schema of the StreamingAggregationStateManager
numColsPrefixKey 0
storeCoordinator StateStoreCoordinator
storeUpdateFunction storeUpdateFunction

storeUpdateFunction

storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]

storeUpdateFunction branches off based on the given OutputMode:

Append

storeUpdateFunction drops late rows (from the input partition) using the watermark predicate for data.

For every (remaining, non-late) row, storeUpdateFunction stores the row in the input StateStore (using the StreamingAggregationStateManager) and increments the number of updated state rows metric.

storeUpdateFunction tracks the time taken (to drop late rows and store non-late rows) in time to update metric.

Performance Metrics

Monitor the following metrics:

  1. number of rows which are dropped by watermark
  2. number of updated state rows
  3. time to update

storeUpdateFunction requests the StreamingAggregationStateManager for all the aggregates by grouping keys.

storeUpdateFunction creates a new NextIterator:

Complete

FIXME

Update

FIXME

AssertionError: outputMode has not been set

doExecute makes sure that outputMode is specified or throws an AssertionError:

Incorrect planning in IncrementalExecution, outputMode has not been set

StreamingAggregationStateManager

StateStoreSaveExec creates a StreamingAggregationStateManager when created.

The StreamingAggregationStateManager is created using the following:

The StreamingAggregationStateManager is used in doExecute for the following:

For Complete:

  1. Store all input rows
  2. Commit the state changes
  3. Fetch the values

For Append:

  1. Store all updated rows
  2. Fetch all the key-value pairs
  3. Remove old "watermarked" aggregates for all removed pairs
  4. Commit the state changes

For Update:

  1. Store all updated rows
  2. removeKeysOlderThanWatermark of old "watermarked" aggregates
  3. Commit the state changes

Required Child Output Distribution

requiredChildDistribution: Seq[Distribution]

requiredChildDistribution is part of the SparkPlan (Spark SQL) abstraction.


requiredChildDistribution...FIXME

Performance Metrics

StateStoreSaveExec is a StateStoreWriter that defines the performance metrics for writes to a state store.

StateStoreSaveExec in web UI (Details for Query)

memory used by state

Estimated memory used by a StateStore (aka stateMemory) after StateStoreSaveExec finished execution (per the StateStoreMetrics of the StateStore)

number of output rows

Number of aggregates (state rows) that were removed from a state store because the watermark predicate for keys held true (i.e., the watermark threshold for the keys was reached).

Equivalent to number of removed state rows.

Number of rows in a StateStore (i.e., all the values in a StateStore in the StreamingAggregationStateManager)

Equivalent to the number of total state rows metric

Number of rows that the StreamingAggregationStateManager was requested to store in a state store (that did not expire per the optional watermarkPredicateForData predicate)

Equivalent to the number of updated state rows metric

number of removed state rows

Number of aggregates (state rows) that were removed from a state store because the watermark predicate for keys held true (i.e., the watermark threshold for the keys was reached).

Equivalent to number of output rows metric.

Not used

Not used

See number of removed state rows

number of rows which are dropped by watermark

See number of rows which are dropped by watermark

number of updated state rows

The metric is computed differently based on the given OutputMode.

Number of input rows that have not expired yet (per the required watermarkPredicateForData predicate) and that the StreamingAggregationStateManager was requested to store in a state store (the time taken is the total time to update rows metric)

Equivalent to the number of input rows (which should be exactly the number of output rows from the child operator) with late rows removed that is number of rows which are dropped by watermark metric

Number of input rows (which should be exactly the number of output rows from the child operator)

Number of rows that the StreamingAggregationStateManager was requested to store in a state store (that did not expire per the optional watermarkPredicateForData predicate)

Optional watermark

If defined, watermarkPredicateForData is used to applyRemovingRowsOlderThanWatermark.

Equivalent to the number of output rows metric

StateStoreSaveExec is a StateStoreWriter so consult number of updated state rows, too.

time to commit changes

Time taken for the StreamingAggregationStateManager to commit changes to a state store


When requested to commit changes to a state store, a StreamingAggregationStateManager (as StreamingAggregationStateManagerBaseImpl) requests the given StateStore to commit state changes.

For RocksDBStateStore, it means for RocksDB to commit state changes which is made up of the time-tracked phases that are available using the performance metrics.

In other words, the total of the time-tracked phases is close approximation of this metric (there are some file-specific ops though that contribute to the metric but are not part of the phases).

time to remove

The metric is computed differently based on the given OutputMode.

For Append, time taken for the entire input row and state processing (except time to commit changes)

For Complete, always 0

StateStoreSaveExec is a StateStoreWriter so consult time to remove, too.

time to update

The metric is computed differently based on the given OutputMode.

Time taken to drop (filter out) expired rows (per the required watermarkPredicateForData predicate) and then store the remaining ones in a state store (using the StreamingAggregationStateManager)

Tip

Consult the correlated metrics (e.g., number of rows which are dropped by watermark and number of updated state rows).

Time taken to filter out expired rows (per the optional watermarkPredicateForData predicate) and store them in a state store (using the StreamingAggregationStateManager)

number of output rows and number of updated state rows metrics are the same and exactly the number of rows that were stored in a state store.

Note

Consult StateStoreWriter to learn more about correlated metrics.

Logging

Enable ALL logging level for org.apache.spark.sql.execution.streaming.StateStoreSaveExec logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.StateStoreSaveExec=ALL

Refer to Logging.