StateStoreSaveExec Unary Physical Operator¶
StateStoreSaveExec
is a unary physical operator that saves a streaming state to a state store with support for streaming watermark.
Note
A unary physical operator (UnaryExecNode
) is a physical operator with a single <
Read up on https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan.html[UnaryExecNode] (and physical operators in general) in https://bit.ly/spark-sql-internals[The Internals of Spark SQL] book.
StateStoreSaveExec
is <Aggregate
logical operators in the logical plan of a streaming query).
The optional properties, i.e. the <StateStoreSaveExec
is <StateStoreSaveExec
is updated to hold execution-specific configuration when IncrementalExecution
is requested to prepare the logical plan (of a streaming query) for execution (when the state preparation rule is executed).
Note
Unlike StateStoreRestoreExec operator, StateStoreSaveExec
takes output mode and event time watermark when created.
When <StateStoreSaveExec
creates a StateStoreRDD to map over partitions with storeUpdateFunction
that manages the StateStore
.
Note
The number of partitions of StateStoreRDD (and hence the number of Spark tasks) is what was defined for the <
There will be that many StateStores
as there are partitions in StateStoreRDD
.
NOTE: StateStoreSaveExec
<
When <StateStoreSaveExec
executes the <storeUpdateFunction
specific to the output mode).
[[output]] The output schema of StateStoreSaveExec
is exactly the <
[[outputPartitioning]] The output partitioning of StateStoreSaveExec
is exactly the <
[[stateManager]] StateStoreRestoreExec
uses a StreamingAggregationStateManager (that is created for the keyExpressions, the output of the child physical operator and the stateFormatVersion).
[[logging]] [TIP] ==== Enable ALL
logging level for org.apache.spark.sql.execution.streaming.StateStoreSaveExec
to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.StateStoreSaveExec=ALL
Refer to <>.¶
=== [[metrics]] Performance Metrics (SQLMetrics)
StateStoreSaveExec
uses the performance metrics as other stateful physical operators that write to a state store.
The following table shows how the performance metrics are computed (and so their exact meaning).
[cols="30,70",options="header",width="100%"] |=== | Name (in web UI) | Description
| total time to update rows a| [[allUpdatesTimeMs]] Time taken to read the input rows and store them in a state store (possibly filtering out expired rows per watermarkPredicateForData predicate)
The number of rows stored is the <
-
For <
> output mode, the time taken to filter out expired rows (per the required watermarkPredicateForData predicate) and the < > to store rows in a state store -
For <
> output mode, the time taken to go over all the input rows and request the < > to store rows in a state store -
For <
> output mode, the time taken to filter out expired rows (per the optional watermarkPredicateForData predicate) and the < > to store rows in a state store
| total time to remove rows a| [[allRemovalsTimeMs]]
-
For <
> output mode, the time taken for the < > to remove all expired entries from a state store (per watermarkPredicateForKeys predicate) that is the total time of iterating over all entries in the state store (the number of entries removed from a state store is the difference between the number of output rows of the child operator and the number of total state rows metric) -
For <
> output mode, always 0
-
For <
> output mode, the time taken for the < > to remove all expired entries from a state store (per watermarkPredicateForKeys predicate)
| time to commit changes a| [[commitTimeMs]] Time taken for the StreamingAggregationStateManager to commit changes to a state store
| number of output rows a| [[numOutputRows]]
-
For <
> output mode, the metric does not seem to be used -
For <
> output mode, the number of rows in a StateStore (i.e. all values in a StateStore in the < > that should be equivalent to the < > metric) -
For <
> output mode, the number of rows that the < > was requested to store in a state store (that did not expire per the optional watermarkPredicateForData predicate) that is equivalent to the < > metric)
| number of total state rows a| [[numTotalStateRows]] Number of entries in a state store at the very end of <
Corresponds to numRowsTotal
attribute in stateOperators
in StreamingQueryProgress (and is available as sq.lastProgress.stateOperators
for an operator).
| number of updated state rows a| [[numUpdatedStateRows]] Number of the entries that were stored as updates in a state store in a trigger and for the keys in the result rows of the upstream physical operator (aka numUpdatedStateRows)
-
For <
> output mode, the number of input rows that have not expired yet (per the required watermarkPredicateForData predicate) and that the < > was requested to store in a state store (the time taken is the < > metric) -
For <
> output mode, the number of input rows (which should be exactly the number of output rows from the < >) -
For <
> output mode, the number of rows that the < > was requested to store in a state store (that did not expire per the optional watermarkPredicateForData predicate) that is equivalent to the < > metric)
Corresponds to numRowsUpdated
attribute in stateOperators
in StreamingQueryProgress (and is available as sq.lastProgress.stateOperators
for an operator).
| memory used by state a| [[stateMemory]] Estimated memory used by a StateStore (aka stateMemory) after StateStoreSaveExec
finished <
Creating Instance¶
StateStoreSaveExec
takes the following to be created:
- [[keyExpressions]] Key expressions (Catalyst attributes for the grouping keys)
- [[stateInfo]] Execution-specific StatefulOperatorStateInfo (default:
None
) - [[outputMode]] Execution-specific OutputMode (default:
None
) - [[eventTimeWatermark]] Event-time watermark (default:
None
) - [[stateFormatVersion]] Version of the state format (based on the spark.sql.streaming.aggregation.stateFormatVersion configuration property)
- [[child]] Child physical operator (
SparkPlan
)
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan
abstraction (Spark SQL).
Internally, doExecute
initializes metrics.
NOTE: doExecute
requires that the optional <IncrementalExecution
had prepared a streaming aggregation for execution).
doExecute
executes <storeUpdateFunction
that:
-
Generates an unsafe projection to access the key field (using <
> and the output schema of < >). -
Branches off per <
>: < >, < > and < >.
doExecute
throws an UnsupportedOperationException
when executed with an invalid <
Invalid output mode: [outputMode]
==== [[doExecute-Append]] Append Output Mode
NOTE: Append is the default output mode when not specified explicitly.
NOTE: Append
output mode requires that a streaming query defines event-time watermark (e.g. using withWatermark operator) on the event-time column that is used in aggregation (directly or using window standard function).
For Append output mode, doExecute
does the following:
-
Finds late (aggregate) rows from <
> physical operator (that have expired per watermark) -
Stores the late rows in the state store and increments the <
> metric -
Creates an iterator that removes the late rows from the state store when requested the next row and in the end commits the state updates
TIP: Refer to <StateStoreSaveExec
with Append
output mode.
CAUTION: FIXME When is "Filtering state store on:" printed out?
-
Uses watermarkPredicateForData predicate to exclude matching rows and (like in Complete output mode) stores all the remaining rows in
StateStore
. -
(like in <
> output mode) While storing the rows, increments < > metric (for every row) and records the total time in < > metric. -
Takes all the rows from
StateStore
and returns aNextIterator
that: -
In
getNext
, finds the first row that matches watermarkPredicateForKeys predicate, removes it fromStateStore
, and returns it back. + If no row was found,getNext
also marks the iterator as finished. -
In
close
, records the time to iterate over all the rows in <> metric, commits the updates to StateStore
followed by recording the time in <> metric and recording StateStore metrics.
Complete Output Mode¶
For Complete output mode, doExecute
does the following:
-
Takes all
UnsafeRow
rows (from the parent iterator) -
Stores the rows by key in the state store eagerly (i.e. all rows that are available in the parent iterator before proceeding)
-
In the end, reads the key-row pairs from the state store and passes the rows along (i.e. to the following physical operator)
The number of keys stored in the state store is recorded in <
NOTE: In Complete
output mode the <
TIP: Refer to <StateStoreSaveExec
with Complete
output mode.
-
Stores all rows (as
UnsafeRow
) inStateStore
. -
While storing the rows, increments <
> metric (for every row) and records the total time in < > metric. -
Records
0
in <> metric. -
Commits the state updates to
StateStore
and records the time in <> metric. -
In the end, takes all the rows stored in
StateStore
and increments numOutputRows metric.
Update Output Mode¶
For Update output mode, doExecute
returns an iterator that filters out late aggregate rows (per watermark if defined) and stores the "young" rows in the state store (one by one, i.e. every next
).
With no more rows available, that removes the late rows from the state store (all at once) and commits the state updates.
TIP: Refer to <StateStoreSaveExec
with Update
output mode.
doExecute
returns Iterator
of rows that uses watermarkPredicateForData predicate to filter out late rows.
In hasNext
, when rows are no longer available:
-
Records the total time to iterate over all the rows in <
> metric. -
removeKeysOlderThanWatermark and records the time in <
> metric. -
Commits the updates to
StateStore
and records the time in <> metric.
In next
, stores a row in StateStore
and increments numOutputRows and numUpdatedStateRows metrics.
=== [[shouldRunAnotherBatch]] Checking Out Whether Last Batch Execution Requires Another Non-Data Batch or Not -- shouldRunAnotherBatch
Method
shouldRunAnotherBatch(
newMetadata: OffsetSeqMetadata): Boolean
shouldRunAnotherBatch
is positive (true
) when all of the following are met:
-
<
> is defined and is older (below) the current event-time watermark (of the given OffsetSeqMetadata
)
Otherwise, shouldRunAnotherBatch
is negative (false
).
shouldRunAnotherBatch
is part of the StateStoreWriter abstraction.