Arbitrary Stateful Streaming Aggregation¶
Arbitrary Stateful Streaming Aggregation is a streaming aggregation query that uses the following high-level operators of KeyValueGroupedDataset (Spark SQL):
- flatMapGroupsWithState for explicit state logic
- mapGroupsWithState for implicit state logic
KeyValueGroupedDataset represents a grouped dataset as a result of Dataset.groupByKey operator.
mapGroupsWithState and flatMapGroupsWithState operators use GroupState as group streaming aggregation state that is created separately for every aggregation key with an aggregation state value (of a user-defined type).
mapGroupsWithState and flatMapGroupsWithState operators use GroupStateTimeout as an aggregation state timeout that defines when a GroupState is considered timed-out (expired).
Demos¶
Use the following demos and complete applications to learn more:
Performance Metrics¶
Arbitrary Stateful Streaming Aggregation uses performance metrics (of the StateStoreWriter through FlatMapGroupsWithStateExec physical operator).
Internals¶
One of the most important internal execution components of Arbitrary Stateful Streaming Aggregation is FlatMapGroupsWithStateExec physical operator.
When executed, FlatMapGroupsWithStateExec first validates a selected GroupStateTimeout:
-
For ProcessingTimeTimeout, batch timeout threshold has to be defined
-
For EventTimeTimeout, event-time watermark has to be defined and the input schema has the watermark attribute
Note
FIXME When are the above requirements met?
FlatMapGroupsWithStateExec physical operator then mapPartitionsWithStateStore with a custom storeUpdateFunction of the following signature:
(StateStore, Iterator[T]) => Iterator[U]
While generating the recipe, FlatMapGroupsWithStateExec uses StateStoreOps extension method object to register a listener that is executed on a task completion. The listener makes sure that a given StateStore has all state changes either committed or aborted.
In the end, FlatMapGroupsWithStateExec creates a new StateStoreRDD and adds it to the RDD lineage.
StateStoreRDD is used to properly distribute tasks across executors (per preferred locations) with help of StateStoreCoordinator (that runs on the driver).
StateStoreRDD uses StateStore helper to look up a StateStore by StateStoreProviderId and store version.
FlatMapGroupsWithStateExec physical operator uses state managers that are different than state managers for Streaming Aggregation. StateStore abstraction is the same as in Streaming Aggregation.
One of the important execution steps is when InputProcessor (of FlatMapGroupsWithStateExec physical operator) is requested to callFunctionAndUpdateState. That executes the user-defined state function on a per-group state key object, value objects, and a GroupStateImpl.