FlatMapGroupsWithStateExec Physical Operator¶
FlatMapGroupsWithStateExec
is a binary physical operator (Spark SQL) that represents FlatMapGroupsWithState logical operator at execution time.
FlatMapGroupsWithStateExec
is an ObjectProducerExec
(Spark SQL) physical operator that produces a single output object.
Creating Instance¶
FlatMapGroupsWithStateExec
takes the following to be created:
- User-defined state function that is applied to every group (of type
(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]
) - Deserializer expression for keys
- Deserializer expression for values
- Initial State Deserializer Expression
- Grouping attributes
- Initial State Group Attributes
- Data attributes
- Initial State Data Attributes
- Output object attribute (that is the reference to the single object field this operator outputs)
- Optional StatefulOperatorStateInfo
- State encoder (
ExpressionEncoder[Any]
) - State format version
- OutputMode
- GroupStateTimeout
- Batch Processing Time
- Event-Time Watermark
-
SparkPlan
of the initial state -
hasInitialState
flag - Child physical operator
FlatMapGroupsWithStateExec
is created when:
- FlatMapGroupsWithStateStrategy execution planning strategy is executed (and plans a FlatMapGroupsWithState logical operator for execution)
OutputMode¶
FlatMapGroupsWithStateExec
is given an OutputMode when created.
The OutputMode
does not seem to be used at all (yet according to the scaladoc) is supposed to be the output mode of the func that knows nothing about the output mode. Interesting.
StackOverflow
Check out the question What's the purpose of OutputMode in flatMapGroupsWithState? How/where is it used? on StackOverflow.
Short Name¶
shortName: String
shortName
is part of the StateStoreWriter abstraction.
shortName
is the following text:
flatMapGroupsWithState
Performance Metrics¶
FlatMapGroupsWithStateExec
uses the performance metrics of StateStoreWriter.
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of SparkPlan
(Spark SQL) abstraction.
Initializing Metrics¶
doExecute
first initializes the metrics (that are accumulators under the covers so it is supposed to happen on the driver first before updates from tasks can have any effect).
Requirements¶
doExecute
makes sure that the parameters are as expected based on the GroupStateTimeout (and throws an IllegalArgumentException
otherwise):
GroupStateTimeout | Requirements |
---|---|
ProcessingTimeTimeout | Batch Processing Time must be non-empty |
EventTimeTimeout | Event-Time Watermark and Watermark Expression must be non-empty |
Processing Partition¶
With Initial State¶
Review Me
doExecute
then requests the child physical operator to execute (and generate an RDD[InternalRow]
).
doExecute
uses StateStoreOps to create a StateStoreRDD with a storeUpdateFunction
that does the following (for a partition):
-
Creates an InputProcessor for a given StateStore
-
(only when the GroupStateTimeout is EventTimeTimeout) Filters out late data based on the event-time watermark, i.e. rows from a given
Iterator[InternalRow]
that are older than the event-time watermark are excluded from the steps that follow -
Requests the
InputProcessor
to create an iterator of a new data processed from the (possibly filtered) iterator -
Requests the
InputProcessor
to create an iterator of a timed-out state data -
Creates an iterator by concatenating the above iterators (with the new data processed first)
-
In the end, creates a
CompletionIterator
that executes a completion function (completionFunction
) after it has successfully iterated through all the elements (i.e. when a client has consumed all the rows). The completion method requests the givenStateStore
to commit changes followed by setting the store-specific metrics
No Initial State¶
With no hasInitialState, doExecute
requests the child physical operator to execute (and generate an RDD[InternalRow]
) and mapPartitionsWithStateStore with the following:
- StatefulOperatorStateInfo
- groupingAttributes
- State Schema of the StateManager
0
for the numColsPrefixKeystoreUpdateFunction
(as below)
storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]
storeUpdateFunction
creates a new InputProcessor with the current partition's StateStore and processes the partition.
Processing Partition¶
processDataWithPartition(
iter: Iterator[InternalRow],
store: StateStore,
processor: InputProcessor,
initialStateIterOption: Option[Iterator[InternalRow]] = None
): CompletionIterator[InternalRow, Iterator[InternalRow]]
Performance Metrics¶
processDataWithPartition
uses the following metrics:
filteredIter¶
With the timeout based on event time (when the GroupStateTimeout is EventTimeTimeout
), processDataWithPartition
drops late rows.
processedOutputIterator¶
With the initial state specified, processDataWithPartition
...FIXME
newDataProcessorIter¶
processDataWithPartition
...FIXME
timeoutProcessorIter¶
With GroupStateTimeout enabled, processDataWithPartition
...FIXME
Output Rows¶
In the end, processDataWithPartition
creates an iterator that returns the rows from newDataProcessorIter followed by timeoutProcessorIter.
processDataWithPartition
creates a (completion) iterator that does the following after all rows have been fully consumed (processed):
- Requests the given StateStore to commit all the state changes (and measures the time for the time to commit changes metrics)
- Sets the StateStore metrics (e.g. number of total state rows, stateMemory and the custom metrics)
- Sets operator metrics (e.g. numShufflePartitions and number of state store instances)
StateStoreWriter¶
FlatMapGroupsWithStateExec
is a stateful physical operator that can write to a state store (and MicroBatchExecution
requests whether to run another batch or not based on the GroupStateTimeout).
FlatMapGroupsWithStateExec
uses the GroupStateTimeout (and possibly the updated metadata) when asked whether to run another batch or not (when MicroBatchExecution
is requested to construct the next streaming micro-batch when requested to run the activated streaming query).
WatermarkSupport¶
FlatMapGroupsWithStateExec
is a physical operator that supports streaming event-time watermark.
FlatMapGroupsWithStateExec
is given the optional event time watermark when created.
The event-time watermark is initially undefined (None
) when planned for execution (in FlatMapGroupsWithStateStrategy execution planning strategy).
Note
FlatMapGroupsWithStateStrategy
converts FlatMapGroupsWithState unary logical operator to FlatMapGroupsWithStateExec
physical operator with undefined StatefulOperatorStateInfo, batchTimestampMs, and eventTimeWatermark.
The event-time watermark (with the StatefulOperatorStateInfo and the batchTimestampMs) is only defined to the current event-time watermark of the given OffsetSeqMetadata when IncrementalExecution
query execution pipeline is requested to apply the state preparation rule (as part of the preparations rules).
Note
The preparations rules are executed (applied to a physical query plan) at the executedPlan
phase of Structured Query Execution Pipeline to generate an optimized physical query plan ready for execution).
Read up on Structured Query Execution Pipeline in The Internals of Spark SQL online book.
IncrementalExecution
is used as the lastExecution of the available streaming query execution engines. It is created in the queryPlanning phase (of the MicroBatchExecution and ContinuousExecution execution engines) based on the current OffsetSeqMetadata.
Note
The optional event-time watermark can only be defined when the state preparation rule is executed which is at the executedPlan
phase of Structured Query Execution Pipeline which is also part of the queryPlanning phase.
StateManager¶
stateManager: StateManager
While being created, FlatMapGroupsWithStateExec
creates a StateManager (with the state encoder and the isTimeoutEnabled flag).
A StateManager
is created per state format version that is given while creating a FlatMapGroupsWithStateExec
(to choose between the available implementations).
The state format version is controlled by spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion internal configuration property.
The StateManager
is used exclusively when FlatMapGroupsWithStateExec
physical operator is executed for the following:
-
State schema (for the value schema of a StateStoreRDD)
-
State data for a key in a StateStore while processing new data
-
All state data (for all keys) in a StateStore while processing timed-out state data
-
Removing the state for a key from a StateStore when all rows have been processed
-
Persisting the state for a key in a StateStore when all rows have been processed
keyExpressions Method¶
keyExpressions: Seq[Attribute]
keyExpressions
simply returns the grouping attributes.
keyExpressions
is part of the WatermarkSupport abstraction.
Checking Out Whether Last Batch Execution Requires Another Non-Data Batch or Not¶
shouldRunAnotherBatch(
newMetadata: OffsetSeqMetadata): Boolean
shouldRunAnotherBatch
uses the GroupStateTimeout as follows:
-
With EventTimeTimeout,
shouldRunAnotherBatch
istrue
only when the event-time watermark is defined and is older (below) the event-time watermark of the givenOffsetSeqMetadata
-
With NoTimeout (and other GroupStateTimeouts if there were any),
shouldRunAnotherBatch
is alwaysfalse
-
With ProcessingTimeTimeout,
shouldRunAnotherBatch
is alwaystrue
shouldRunAnotherBatch
is part of the StateStoreWriter abstraction.
Internal Properties¶
isTimeoutEnabled Flag¶
Flag that says whether the GroupStateTimeout is not NoTimeout
Used when:
FlatMapGroupsWithStateExec
is created (and creates the internal StateManager)InputProcessor
is requested to processTimedOutState
watermarkPresent Flag¶
Flag that says whether the child physical operator has a watermark attribute (among the output attributes).
Used when:
InputProcessor
is requested to callFunctionAndUpdateState
Required Child Output Distribution¶
requiredChildDistribution: Seq[Distribution]
requiredChildDistribution
is part of the SparkPlan
(Spark SQL) abstraction.
requiredChildDistribution
...FIXME
Demo¶
Demo: Internals of FlatMapGroupsWithStateExec Physical Operator
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec=ALL
Refer to Logging.