Skip to content

StateStoreWriter Physical Operators

StateStoreWriter is an extension of the StatefulOperator abstraction for stateful physical operators that write to a state store.

StateStoreWriter operators collect performance metrics for execution progress reporting.

Implementations

Performance Metrics

ID Name
allRemovalsTimeMs time to remove
allUpdatesTimeMs time to update
commitTimeMs time to commit changes
numOutputRows number of output rows
numRemovedStateRows number of removed state rows
numRowsDroppedByWatermark number of rows which are dropped by watermark
numShufflePartitions number of shuffle partitions
numStateStoreInstances number of state store instances
numTotalStateRows number of total state rows
numUpdatedStateRows number of updated state rows
statefulOperatorCustomMetrics
stateMemory memory used by state
stateStoreCustomMetrics

number of rows which are dropped by watermark

Incremented in applyRemovingRowsOlderThanWatermark (to drop late rows based on a watermark)

Reported in web UI as Aggregated Number Of Rows Dropped By Watermark

Reported as numRowsDroppedByWatermark when reporting progress

number of state store instances

Default: 1 (and can only be greater for StreamingSymmetricHashJoinExec)

Updated using setOperatorMetrics

Reported as numStateStoreInstances when reporting progress

number of total state rows

Number of the keys across all state stores:

Reported as numRowsTotal when reporting progress

Can be disabled (for performance reasons) using spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows

number of updated state rows

Number of state rows (entries) that were updated or removed (for which StateManager, StreamingAggregationStateManager or StreamingSessionWindowStateManager were requested to put or update a state value)

State rows are stored or updated for the keys in the result rows of an upstream physical operator.

Displayed in Structured Streaming UI as Aggregated Number Of Updated State Rows

Reported as numRowsUpdated when reporting progress

StreamingSymmetricHashJoinExec

For StreamingSymmetricHashJoinExec, the metric is a sum of the left and right side's numUpdatedStateRows.

time to commit changes

Time for a StateStore to commit state changes

Reported as commitTimeMs when reporting progress

time to remove

Time taken to...FIXME

Reported as allRemovalsTimeMs when reporting progress

time to update

Time taken to read the input rows and store them in a state store (possibly dropping expired rows per watermarkPredicateForData predicate)

Reported as allUpdatesTimeMs when reporting progress

number of rows which are dropped by watermark

Use number of rows which are dropped by watermark for the number of rows dropped (per the watermarkPredicateForData).

number of updated state rows

Use number of updated state rows for the number of rows stored in a state store.

Short Name

shortName: String

shortName is defaultName (and is expected to be overriden by the implementations).


shortName is used when:

Custom Metrics

customStatefulOperatorMetrics: Seq[StatefulOperatorCustomMetric]

customStatefulOperatorMetrics is empty (and is expected to be overriden by the implementations).

StreamingDeduplicateExec

StreamingDeduplicateExec physical operator is the only implementation with custom metrics.


customStatefulOperatorMetrics is used when:

Reporting Operator Progress

getProgress(): StateOperatorProgress

getProgress creates a StateOperatorProgress with the shortName and the following metrics:

Property Metric
numRowsTotal numTotalStateRows
numRowsUpdated numUpdatedStateRows
allUpdatesTimeMs allUpdatesTimeMs
numRowsRemoved numRemovedStateRows
allRemovalsTimeMs allRemovalsTimeMs
commitTimeMs commitTimeMs
memoryUsedBytes stateMemory
numRowsDroppedByWatermark numRowsDroppedByWatermark
numShufflePartitions numShufflePartitions
numStateStoreInstances numStateStoreInstances
customMetrics Current values of the custom metrics (stateStoreCustomMetrics and statefulOperatorCustomMetrics)

getProgress is used when:

Does Last Batch Execution Require Extra Non-Data Batch

shouldRunAnotherBatch(
  newMetadata: OffsetSeqMetadata): Boolean

shouldRunAnotherBatch is negative (false) by default (to indicate that another non-data batch is not required given the OffsetSeqMetadata with the event-time watermark and the batch timestamp).

shouldRunAnotherBatch is used when IncrementalExecution is requested to check out whether the last batch execution requires another batch (when MicroBatchExecution is requested to run the activated streaming query).

Custom Metrics

Stateful Operator

statefulOperatorCustomMetrics: Map[String, SQLMetric]

statefulOperatorCustomMetrics converts the customStatefulOperatorMetrics to pairs of names and SQLMetrics (Spark SQL).


statefulOperatorCustomMetrics is used when:

StateStore

stateStoreCustomMetrics: Map[String, SQLMetric]

stateStoreCustomMetrics creates a StateStoreProvider (based on spark.sql.streaming.stateStore.providerClass).

stateStoreCustomMetrics requests the StateStoreProvider for supportedCustomMetrics.


stateStoreCustomMetrics is used when:

Recording Metrics

Stateful Operator

setOperatorMetrics(
  numStateStoreInstances: Int = 1): Unit

setOperatorMetrics updates the following metrics:


setOperatorMetrics is used when the following physical operators are executed:

StateStore

setStoreMetrics(
  store: StateStore): Unit

setStoreMetrics requests the given StateStore for the metrics and records the following metrics:

setStoreMetrics records (adds) the values of the custom metrics.


setStoreMetrics is used when the following physical operators are executed:

Dropping Late Rows (Older Than Watermark)

applyRemovingRowsOlderThanWatermark(
  iter: Iterator[InternalRow],
  predicateDropRowByWatermark: BasePredicate): Iterator[InternalRow]

applyRemovingRowsOlderThanWatermark filters out (drops) late rows based on the given predicateDropRowByWatermark (i.e., when holds true).

Every time a row is dropped number of rows which are dropped by watermark metric is incremented.


applyRemovingRowsOlderThanWatermark is used when: