StateStoreRestoreExec Physical Operator¶
StateStoreRestoreExec
is a unary physical operator (Spark SQL) to restore (read) a streaming state (from a state store) (for the keys from the child physical operator).
StateStoreRestoreExec
is among the physical operators used to execute streaming aggregations.
Creating Instance¶
StateStoreRestoreExec
takes the following to be created:
- Grouping Key
Attribute
s (Spark SQL) - StatefulOperatorStateInfo
- spark.sql.streaming.aggregation.stateFormatVersion
- Child Physical Operator (Spark SQL)
StateStoreRestoreExec
is created when:
StatefulAggregationStrategy
execution planning strategy is requested to plan a streaming aggregationIncrementalExecution
is created
StatefulOperatorStateInfo¶
StateStoreRestoreExec
can be given a StatefulOperatorStateInfo when created.
The StatefulOperatorStateInfo
is initially undefined when StateStoreRestoreExec
is created.
The StatefulOperatorStateInfo
is specified (so this StateStoreRestoreExec
gets a streaming batch-specific execution property) when IncrementalExecution
is requested to prepare a streaming physical plan for execution (and state preparation rule is executed when StreamExecution
plans a streaming query for a streaming batch).
Performance Metrics¶
number of output rows¶
The number of input rows from the child physical operator this StateStoreRestoreExec
found the state value for when doExecute
FIXME number of output rows
The number of output rows metric seems to be always an even number of the restoredRow
from a state store and the row
itself (from the child physical operator).
StreamingAggregationStateManager¶
StateStoreRestoreExec
creates a StreamingAggregationStateManager when created.
The StreamingAggregationStateManager
is created using the grouping key expressions and the output schema of the child physical operator.
The StreamingAggregationStateManager
is used in doExecute for the following:
- State Value Schema to mapPartitionsWithReadStateStore
- Extract a key (from an input row) and get the value (for the key) for every input row
Required Child Output Distribution¶
requiredChildDistribution: Seq[Distribution]
requiredChildDistribution
is part of the SparkPlan
(Spark SQL) abstraction.
requiredChildDistribution
...FIXME
Executing Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan
(Spark SQL) abstraction.
doExecute
executes the child operator and creates a StateStoreRDD with storeUpdateFunction
that does the following per partition:
-
Generates an unsafe projection to access the key field (using keyExpressions and the output schema of child operator).
-
For every input row (as
InternalRow
)- Extracts the key from the row (using the unsafe projection above)
- Gets the saved state in
StateStore
for the key if available (it might not be if the key appeared in the input the first time) - Increments numOutputRows metric
- Generates collection made up of the current row and possibly the state for the key if available