StateStoreSaveExec Physical Operator¶
StateStoreSaveExec
is a unary physical operator (Spark SQL) that saves (writes) a streaming state (to a state store) with support for streaming watermark.
StateStoreSaveExec
is one of the physical operators used for streaming aggregation.
Creating Instance¶
StateStoreSaveExec
takes the following to be created:
- Grouping Key
Attribute
s (Spark SQL) - StatefulOperatorStateInfo
- OutputMode
- Event-time Watermark
- spark.sql.streaming.aggregation.stateFormatVersion
- Child Physical Operator (Spark SQL)
StateStoreSaveExec
is created when:
StatefulAggregationStrategy
execution planning strategy is requested to plan a streaming aggregationIncrementalExecution
is created
OutputMode¶
outputMode: Option[OutputMode]
StateStoreSaveExec
can be given an OutputMode when created.
StateStoreSaveExec
supports all three OutputModes when executed:
For Append
and Update
output modes with event-time watermark defined, StateStoreSaveExec
can run another batch if the watermark has advanced.
OutputMode
is undefined (None
) by default and when AggUtils
is requested to planStreamingAggregation
(when StatefulAggregationStrategy execution planning strategy is requested to plan a streaming aggregation).
OutputMode
is required for executing StateStoreSaveExec. It is specified to be the OutputMode of the IncrementalExecution (when state preparation rule is executed and fills in the execution-specific configuration).
Short Name¶
shortName: String
shortName
is part of the StateStoreWriter abstraction.
shortName
is the following text:
stateStoreSave
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan
(Spark SQL) abstraction.
doExecute
creates the metrics (to happen on the driver).
doExecute
executes the child physical operator (that produces a RDD[InternalRow]
) and creates a StateStoreRDD with the following:
mapPartitionsWithStateStore | Value |
---|---|
stateInfo | getStateInfo |
keySchema | Grouping Keys |
valueSchema | State Value Schema of the StreamingAggregationStateManager |
numColsPrefixKey | 0 |
storeCoordinator | StateStoreCoordinator |
storeUpdateFunction | storeUpdateFunction |
storeUpdateFunction¶
storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]
storeUpdateFunction
branches off based on the given OutputMode:
Append¶
storeUpdateFunction
drops late rows (from the input partition) using the watermark predicate for data.
For every (remaining, non-late) row, storeUpdateFunction
stores the row in the input StateStore (using the StreamingAggregationStateManager) and increments the number of updated state rows metric.
storeUpdateFunction
tracks the time taken (to drop late rows and store non-late rows) in time to update metric.
Performance Metrics
Monitor the following metrics:
storeUpdateFunction
requests the StreamingAggregationStateManager for all the aggregates by grouping keys.
storeUpdateFunction
creates a new NextIterator
:
-
When requested for next state value (row), the iterator traverses over the key-value aggregate state pairs until the watermark predicate for keys holds
true
for a key.If so, the iterator removes the key from the state store (via the StreamingAggregationStateManager) and increments the number of removed state rows and number of output rows metrics.
The value of the removed key is returned as the next element.
-
When requested to close, the iterator updates the time to remove metric (to be the time to process all the state rows) and time to commit changes (to be the time taken for the StreamingAggregationStateManager to commit state changes).
In the end, the iterator records metrics of the StateStore and this stateful operator.
Complete¶
FIXME
Update¶
FIXME
AssertionError: outputMode has not been set¶
doExecute
makes sure that outputMode is specified or throws an AssertionError
:
Incorrect planning in IncrementalExecution, outputMode has not been set
StreamingAggregationStateManager¶
StateStoreSaveExec
creates a StreamingAggregationStateManager when created.
The StreamingAggregationStateManager
is created using the following:
- Grouping Keys
- Output schema of the child physical operator
The StreamingAggregationStateManager
is used in doExecute for the following:
For Complete:
- Store all input rows
- Commit the state changes
- Fetch the values
For Append:
- Store all updated rows
- Fetch all the key-value pairs
- Remove old "watermarked" aggregates for all removed pairs
- Commit the state changes
For Update:
- Store all updated rows
- removeKeysOlderThanWatermark of old "watermarked" aggregates
- Commit the state changes
Required Child Output Distribution¶
requiredChildDistribution: Seq[Distribution]
requiredChildDistribution
is part of the SparkPlan
(Spark SQL) abstraction.
requiredChildDistribution
...FIXME
Performance Metrics¶
StateStoreSaveExec
is a StateStoreWriter that defines the performance metrics for writes to a state store.
memory used by state¶
Estimated memory used by a StateStore (aka stateMemory) after StateStoreSaveExec
finished execution (per the StateStoreMetrics of the StateStore)
number of output rows¶
Number of aggregates (state rows) that were removed from a state store because the watermark predicate for keys held true
(i.e., the watermark threshold for the keys was reached).
Equivalent to number of removed state rows.
Number of rows in a StateStore (i.e., all the values in a StateStore in the StreamingAggregationStateManager)
Equivalent to the number of total state rows metric
Number of rows that the StreamingAggregationStateManager was requested to store in a state store (that did not expire per the optional watermarkPredicateForData predicate)
Equivalent to the number of updated state rows metric
number of removed state rows¶
Number of aggregates (state rows) that were removed from a state store because the watermark predicate for keys held true
(i.e., the watermark threshold for the keys was reached).
Equivalent to number of output rows metric.
Not used
Not used
See number of removed state rows
number of rows which are dropped by watermark¶
See number of rows which are dropped by watermark
number of updated state rows¶
The metric is computed differently based on the given OutputMode.
Number of input rows that have not expired yet (per the required watermarkPredicateForData predicate) and that the StreamingAggregationStateManager was requested to store in a state store (the time taken is the total time to update rows metric)
Equivalent to the number of input rows (which should be exactly the number of output rows from the child operator) with late rows removed that is number of rows which are dropped by watermark metric
Number of input rows (which should be exactly the number of output rows from the child operator)
Number of rows that the StreamingAggregationStateManager was requested to store in a state store (that did not expire per the optional watermarkPredicateForData predicate)
Optional watermark
If defined, watermarkPredicateForData is used to applyRemovingRowsOlderThanWatermark.
Equivalent to the number of output rows metric
StateStoreSaveExec
is a StateStoreWriter so consult number of updated state rows, too.
time to commit changes¶
Time taken for the StreamingAggregationStateManager to commit changes to a state store
When requested to commit changes to a state store, a StreamingAggregationStateManager (as StreamingAggregationStateManagerBaseImpl) requests the given StateStore to commit state changes.
For RocksDBStateStore, it means for RocksDB to commit state changes which is made up of the time-tracked phases that are available using the performance metrics.
In other words, the total of the time-tracked phases is close approximation of this metric (there are some file-specific ops though that contribute to the metric but are not part of the phases).
time to remove¶
The metric is computed differently based on the given OutputMode.
For Append, time taken for the entire input row and state processing (except time to commit changes)
For Complete, always 0
For Update, time taken to removeKeysOlderThanWatermark (using the StreamingAggregationStateManager)
StateStoreSaveExec
is a StateStoreWriter so consult time to remove, too.
time to update¶
The metric is computed differently based on the given OutputMode.
Time taken to drop (filter out) expired rows (per the required watermarkPredicateForData predicate) and then store the remaining ones in a state store (using the StreamingAggregationStateManager)
Tip
Consult the correlated metrics (e.g., number of rows which are dropped by watermark and number of updated state rows).
Time taken to filter out expired rows (per the optional watermarkPredicateForData predicate) and store them in a state store (using the StreamingAggregationStateManager)
number of output rows and number of updated state rows metrics are the same and exactly the number of rows that were stored in a state store.
Note
Consult StateStoreWriter to learn more about correlated metrics.
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.streaming.StateStoreSaveExec
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.StateStoreSaveExec=ALL
Refer to Logging.