Skip to content

StateStoreRestoreExec Physical Operator

StateStoreRestoreExec is a unary physical operator (Spark SQL) to restore (read) a streaming state (from a state store) (for the keys from the child physical operator).

StateStoreRestoreExec is among the physical operators used to execute streaming aggregations.

StateStoreRestoreExec and StatefulAggregationStrategy

Creating Instance

StateStoreRestoreExec takes the following to be created:

StateStoreRestoreExec is created when:


StateStoreRestoreExec can be given a StatefulOperatorStateInfo when created.

The StatefulOperatorStateInfo is initially undefined when StateStoreRestoreExec is created.

The StatefulOperatorStateInfo is specified (so this StateStoreRestoreExec gets a streaming batch-specific execution property) when IncrementalExecution is requested to prepare a streaming physical plan for execution (and state preparation rule is executed when StreamExecution plans a streaming query for a streaming batch).

StateStoreRestoreExec and IncrementalExecution

Performance Metrics

StateStoreRestoreExec in web UI (Details for Query)

number of output rows

The number of input rows from the child physical operator this StateStoreRestoreExec found the state value for when doExecute

FIXME number of output rows

The number of output rows metric seems to be always an even number of the restoredRow from a state store and the row itself (from the child physical operator).


StateStoreRestoreExec creates a StreamingAggregationStateManager when created.

The StreamingAggregationStateManager is created using the grouping key expressions and the output schema of the child physical operator.

The StreamingAggregationStateManager is used in doExecute for the following:

Required Child Output Distribution

requiredChildDistribution: Seq[Distribution]

requiredChildDistribution is part of the SparkPlan (Spark SQL) abstraction.


Executing Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan (Spark SQL) abstraction.

doExecute executes the child operator and creates a StateStoreRDD with storeUpdateFunction that does the following per partition:

  1. Generates an unsafe projection to access the key field (using keyExpressions and the output schema of child operator).

  2. For every input row (as InternalRow)

    • Extracts the key from the row (using the unsafe projection above)
    • Gets the saved state in StateStore for the key if available (it might not be if the key appeared in the input the first time)
    • Increments numOutputRows metric
    • Generates collection made up of the current row and possibly the state for the key if available