StateStoreSaveExec Physical Operator¶
StateStoreSaveExec is a unary physical operator (Spark SQL) that saves (writes) a streaming state (to a state store) with support for streaming watermark.
StateStoreSaveExec is one of the physical operators used for streaming aggregation.

Creating Instance¶
StateStoreSaveExec takes the following to be created:
- Grouping Key
Attributes (Spark SQL) - StatefulOperatorStateInfo
- OutputMode
- Event-time Watermark
- spark.sql.streaming.aggregation.stateFormatVersion
- Child Physical Operator (Spark SQL)
StateStoreSaveExec is created when:
StatefulAggregationStrategyexecution planning strategy is requested to plan a streaming aggregationIncrementalExecutionis created
OutputMode¶
outputMode: Option[OutputMode]
StateStoreSaveExec can be given an OutputMode when created.
StateStoreSaveExec supports all three OutputModes when executed:
For Append and Update output modes with event-time watermark defined, StateStoreSaveExec can run another batch if the watermark has advanced.
OutputMode is undefined (None) by default and when AggUtils is requested to planStreamingAggregation (when StatefulAggregationStrategy execution planning strategy is requested to plan a streaming aggregation).
OutputMode is required for executing StateStoreSaveExec. It is specified to be the OutputMode of the IncrementalExecution (when state preparation rule is executed and fills in the execution-specific configuration).
Short Name¶
shortName: String
shortName is part of the StateStoreWriter abstraction.
shortName is the following text:
stateStoreSave
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute is part of the SparkPlan (Spark SQL) abstraction.
doExecute creates the metrics (to happen on the driver).
doExecute executes the child physical operator (that produces a RDD[InternalRow]) and creates a StateStoreRDD with the following:
| mapPartitionsWithStateStore | Value |
|---|---|
| stateInfo | getStateInfo |
| keySchema | Grouping Keys |
| valueSchema | State Value Schema of the StreamingAggregationStateManager |
| numColsPrefixKey | 0 |
| storeCoordinator | StateStoreCoordinator |
| storeUpdateFunction | storeUpdateFunction |
storeUpdateFunction¶
storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]
storeUpdateFunction branches off based on the given OutputMode:
Append¶
storeUpdateFunction drops late rows (from the input partition) using the watermark predicate for data.
For every (remaining, non-late) row, storeUpdateFunction stores the row in the input StateStore (using the StreamingAggregationStateManager) and increments the number of updated state rows metric.
storeUpdateFunction tracks the time taken (to drop late rows and store non-late rows) in time to update metric.
Performance Metrics
Monitor the following metrics:
storeUpdateFunction requests the StreamingAggregationStateManager for all the aggregates by grouping keys.
storeUpdateFunction creates a new NextIterator:
-
When requested for next state value (row), the iterator traverses over the key-value aggregate state pairs until the watermark predicate for keys holds
truefor a key.If so, the iterator removes the key from the state store (via the StreamingAggregationStateManager) and increments the number of removed state rows and number of output rows metrics.
The value of the removed key is returned as the next element.
-
When requested to close, the iterator updates the time to remove metric (to be the time to process all the state rows) and time to commit changes (to be the time taken for the StreamingAggregationStateManager to commit state changes).
In the end, the iterator records metrics of the StateStore and this stateful operator.
Complete¶
FIXME
Update¶
FIXME
AssertionError: outputMode has not been set¶
doExecute makes sure that outputMode is specified or throws an AssertionError:
Incorrect planning in IncrementalExecution, outputMode has not been set
StreamingAggregationStateManager¶
StateStoreSaveExec creates a StreamingAggregationStateManager when created.
The StreamingAggregationStateManager is created using the following:
- Grouping Keys
- Output schema of the child physical operator
The StreamingAggregationStateManager is used in doExecute for the following:
For Complete:
- Store all input rows
- Commit the state changes
- Fetch the values
For Append:
- Store all updated rows
- Fetch all the key-value pairs
- Remove old "watermarked" aggregates for all removed pairs
- Commit the state changes
For Update:
- Store all updated rows
- removeKeysOlderThanWatermark of old "watermarked" aggregates
- Commit the state changes
Required Child Output Distribution¶
requiredChildDistribution: Seq[Distribution]
requiredChildDistribution is part of the SparkPlan (Spark SQL) abstraction.
requiredChildDistribution...FIXME
Performance Metrics¶
StateStoreSaveExec is a StateStoreWriter that defines the performance metrics for writes to a state store.

memory used by state¶
Estimated memory used by a StateStore (aka stateMemory) after StateStoreSaveExec finished execution (per the StateStoreMetrics of the StateStore)
number of output rows¶
Number of aggregates (state rows) that were removed from a state store because the watermark predicate for keys held true (i.e., the watermark threshold for the keys was reached).
Equivalent to number of removed state rows.
Number of rows in a StateStore (i.e., all the values in a StateStore in the StreamingAggregationStateManager)
Equivalent to the number of total state rows metric
Number of rows that the StreamingAggregationStateManager was requested to store in a state store (that did not expire per the optional watermarkPredicateForData predicate)
Equivalent to the number of updated state rows metric
number of removed state rows¶
Number of aggregates (state rows) that were removed from a state store because the watermark predicate for keys held true (i.e., the watermark threshold for the keys was reached).
Equivalent to number of output rows metric.
Not used
Not used
See number of removed state rows
number of rows which are dropped by watermark¶
See number of rows which are dropped by watermark
number of updated state rows¶
The metric is computed differently based on the given OutputMode.
Number of input rows that have not expired yet (per the required watermarkPredicateForData predicate) and that the StreamingAggregationStateManager was requested to store in a state store (the time taken is the total time to update rows metric)
Equivalent to the number of input rows (which should be exactly the number of output rows from the child operator) with late rows removed that is number of rows which are dropped by watermark metric
Number of input rows (which should be exactly the number of output rows from the child operator)
Number of rows that the StreamingAggregationStateManager was requested to store in a state store (that did not expire per the optional watermarkPredicateForData predicate)
Optional watermark
If defined, watermarkPredicateForData is used to applyRemovingRowsOlderThanWatermark.
Equivalent to the number of output rows metric
StateStoreSaveExec is a StateStoreWriter so consult number of updated state rows, too.
time to commit changes¶
Time taken for the StreamingAggregationStateManager to commit changes to a state store
When requested to commit changes to a state store, a StreamingAggregationStateManager (as StreamingAggregationStateManagerBaseImpl) requests the given StateStore to commit state changes.
For RocksDBStateStore, it means for RocksDB to commit state changes which is made up of the time-tracked phases that are available using the performance metrics.
In other words, the total of the time-tracked phases is close approximation of this metric (there are some file-specific ops though that contribute to the metric but are not part of the phases).
time to remove¶
The metric is computed differently based on the given OutputMode.
For Append, time taken for the entire input row and state processing (except time to commit changes)
For Complete, always 0
For Update, time taken to removeKeysOlderThanWatermark (using the StreamingAggregationStateManager)
StateStoreSaveExec is a StateStoreWriter so consult time to remove, too.
time to update¶
The metric is computed differently based on the given OutputMode.
Time taken to drop (filter out) expired rows (per the required watermarkPredicateForData predicate) and then store the remaining ones in a state store (using the StreamingAggregationStateManager)
Tip
Consult the correlated metrics (e.g., number of rows which are dropped by watermark and number of updated state rows).
Time taken to filter out expired rows (per the optional watermarkPredicateForData predicate) and store them in a state store (using the StreamingAggregationStateManager)
number of output rows and number of updated state rows metrics are the same and exactly the number of rows that were stored in a state store.
Note
Consult StateStoreWriter to learn more about correlated metrics.
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.streaming.StateStoreSaveExec logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.execution.streaming.StateStoreSaveExec=ALL
Refer to Logging.