StateStoreWriter Physical Operators¶
StateStoreWriter
is an extension of the StatefulOperator abstraction for stateful physical operators that write to a state store.
StateStoreWriter
operators collect performance metrics for execution progress reporting.
Implementations¶
- FlatMapGroupsWithStateExec
- SessionWindowStateStoreSaveExec
- StateStoreSaveExec
- StreamingDeduplicateExec
- StreamingGlobalLimitExec
- StreamingSymmetricHashJoinExec
Performance Metrics¶
ID | Name |
---|---|
allRemovalsTimeMs | time to remove |
allUpdatesTimeMs | time to update |
commitTimeMs | time to commit changes |
numOutputRows | number of output rows |
numRemovedStateRows | number of removed state rows |
numRowsDroppedByWatermark | number of rows which are dropped by watermark |
numShufflePartitions | number of shuffle partitions |
numStateStoreInstances | number of state store instances |
numTotalStateRows | number of total state rows |
numUpdatedStateRows | number of updated state rows |
statefulOperatorCustomMetrics | |
stateMemory | memory used by state |
stateStoreCustomMetrics |
number of rows which are dropped by watermark¶
Incremented in applyRemovingRowsOlderThanWatermark (to drop late rows based on a watermark)
Reported in web UI as Aggregated Number Of Rows Dropped By Watermark
Reported as numRowsDroppedByWatermark when reporting progress
number of state store instances¶
Default: 1
(and can only be greater for StreamingSymmetricHashJoinExec)
Updated using setOperatorMetrics
Reported as numStateStoreInstances when reporting progress
number of total state rows¶
Number of the keys across all state stores:
- For unary stateful operators, updated based on the numKeys metric of a StateStore
- For StreamingSymmetricHashJoinExec, updated at onOutputCompletion (of processPartitions)
Reported as numRowsTotal when reporting progress
Can be disabled (for performance reasons) using spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows
number of updated state rows¶
Number of state rows (entries) that were updated or removed (for which StateManager, StreamingAggregationStateManager or StreamingSessionWindowStateManager were requested to put or update a state value)
State rows are stored or updated for the keys in the result rows of an upstream physical operator.
Displayed in Structured Streaming UI as Aggregated Number Of Updated State Rows
Reported as numRowsUpdated when reporting progress
StreamingSymmetricHashJoinExec
For StreamingSymmetricHashJoinExec, the metric is a sum of the left and right side's numUpdatedStateRows
.
time to commit changes¶
Time for a StateStore to commit state changes
Reported as commitTimeMs when reporting progress
time to remove¶
Time taken to...FIXME
Reported as allRemovalsTimeMs when reporting progress
time to update¶
Time taken to read the input rows and store them in a state store (possibly dropping expired rows per watermarkPredicateForData predicate)
Reported as allUpdatesTimeMs when reporting progress
number of rows which are dropped by watermark
Use number of rows which are dropped by watermark for the number of rows dropped (per the watermarkPredicateForData).
number of updated state rows
Use number of updated state rows for the number of rows stored in a state store.
Short Name¶
shortName: String
shortName
is defaultName
(and is expected to be overriden by the implementations).
shortName
is used when:
StateStoreWriter
is requested for a StateOperatorProgress
Custom Metrics¶
customStatefulOperatorMetrics: Seq[StatefulOperatorCustomMetric]
customStatefulOperatorMetrics
is empty (and is expected to be overriden by the implementations).
StreamingDeduplicateExec
StreamingDeduplicateExec physical operator is the only implementation with custom metrics.
customStatefulOperatorMetrics
is used when:
StateStoreWriter
is requested for the custom metrics of this stateful operator
Reporting Operator Progress¶
getProgress(): StateOperatorProgress
getProgress
creates a StateOperatorProgress with the shortName and the following metrics:
Property | Metric |
---|---|
numRowsTotal | numTotalStateRows |
numRowsUpdated | numUpdatedStateRows |
allUpdatesTimeMs | allUpdatesTimeMs |
numRowsRemoved | numRemovedStateRows |
allRemovalsTimeMs | allRemovalsTimeMs |
commitTimeMs | commitTimeMs |
memoryUsedBytes | stateMemory |
numRowsDroppedByWatermark | numRowsDroppedByWatermark |
numShufflePartitions | numShufflePartitions |
numStateStoreInstances | numStateStoreInstances |
customMetrics | Current values of the custom metrics (stateStoreCustomMetrics and statefulOperatorCustomMetrics) |
getProgress
is used when:
ProgressReporter
is requested to extractStateOperatorMetrics (whenMicroBatchExecution
is requested to run the activated streaming query)
Does Last Batch Execution Require Extra Non-Data Batch¶
shouldRunAnotherBatch(
newMetadata: OffsetSeqMetadata): Boolean
shouldRunAnotherBatch
is negative (false
) by default (to indicate that another non-data batch is not required given the OffsetSeqMetadata with the event-time watermark and the batch timestamp).
shouldRunAnotherBatch
is used when IncrementalExecution
is requested to check out whether the last batch execution requires another batch (when MicroBatchExecution
is requested to run the activated streaming query).
Custom Metrics¶
Stateful Operator¶
statefulOperatorCustomMetrics: Map[String, SQLMetric]
statefulOperatorCustomMetrics
converts the customStatefulOperatorMetrics to pairs of name
s and SQLMetric
s (Spark SQL).
statefulOperatorCustomMetrics
is used when:
StateStoreWriter
is requested for the performance metrics and a progress
StateStore¶
stateStoreCustomMetrics: Map[String, SQLMetric]
stateStoreCustomMetrics
creates a StateStoreProvider (based on spark.sql.streaming.stateStore.providerClass).
stateStoreCustomMetrics
requests the StateStoreProvider
for supportedCustomMetrics.
stateStoreCustomMetrics
is used when:
Recording Metrics¶
Stateful Operator¶
setOperatorMetrics(
numStateStoreInstances: Int = 1): Unit
setOperatorMetrics
updates the following metrics:
- Increments numShufflePartitions
- Adds the given
numStateStoreInstances
to numStateStoreInstances metric
setOperatorMetrics
is used when the following physical operators are executed:
- FlatMapGroupsWithStateExec
- StateStoreSaveExec
- SessionWindowStateStoreSaveExec
- StreamingDeduplicateExec
- StreamingGlobalLimitExec
- StreamingSymmetricHashJoinExec
StateStore¶
setStoreMetrics(
store: StateStore): Unit
setStoreMetrics
requests the given StateStore for the metrics and records the following metrics:
- Adds the number of keys to numTotalStateRows metric
- Adds the memory used (in bytes) to stateMemory metric
setStoreMetrics
records (adds) the values of the custom metrics.
setStoreMetrics
is used when the following physical operators are executed:
- FlatMapGroupsWithStateExec
- StateStoreSaveExec
- SessionWindowStateStoreSaveExec
- StreamingDeduplicateExec
- StreamingGlobalLimitExec
Dropping Late Rows (Older Than Watermark)¶
applyRemovingRowsOlderThanWatermark(
iter: Iterator[InternalRow],
predicateDropRowByWatermark: BasePredicate): Iterator[InternalRow]
applyRemovingRowsOlderThanWatermark
filters out (drops) late rows based on the given predicateDropRowByWatermark
(i.e., when holds true
).
Every time a row is dropped number of rows which are dropped by watermark metric is incremented.
applyRemovingRowsOlderThanWatermark
is used when:
FlatMapGroupsWithStateExec
is requested to processDataWithPartitionStateStoreSaveExec
is executed (forAppend
andUpdate
output modes)StreamingDeduplicateExec
is executedStreamingSymmetricHashJoinExec.OneSideHashJoiner
is requested to storeAndJoinWithOtherSide