Arbitrary Stateful Streaming Aggregation¶
Arbitrary Stateful Streaming Aggregation is a streaming aggregation query that uses the following high-level operators of KeyValueGroupedDataset
(Spark SQL):
- flatMapGroupsWithState for explicit state logic
- mapGroupsWithState for implicit state logic
KeyValueGroupedDataset
represents a grouped dataset as a result of Dataset.groupByKey operator.
mapGroupsWithState
and flatMapGroupsWithState
operators use GroupState as group streaming aggregation state that is created separately for every aggregation key with an aggregation state value (of a user-defined type).
mapGroupsWithState
and flatMapGroupsWithState
operators use GroupStateTimeout as an aggregation state timeout that defines when a GroupState is considered timed-out (expired).
Demos¶
Use the following demos and complete applications to learn more:
Performance Metrics¶
Arbitrary Stateful Streaming Aggregation uses performance metrics (of the StateStoreWriter through FlatMapGroupsWithStateExec physical operator).
Internals¶
One of the most important internal execution components of Arbitrary Stateful Streaming Aggregation is FlatMapGroupsWithStateExec physical operator.
When executed, FlatMapGroupsWithStateExec
first validates a selected GroupStateTimeout:
-
For ProcessingTimeTimeout, batch timeout threshold has to be defined
-
For EventTimeTimeout, event-time watermark has to be defined and the input schema has the watermark attribute
Note
FIXME When are the above requirements met?
FlatMapGroupsWithStateExec
physical operator then mapPartitionsWithStateStore with a custom storeUpdateFunction
of the following signature:
(StateStore, Iterator[T]) => Iterator[U]
While generating the recipe, FlatMapGroupsWithStateExec
uses StateStoreOps extension method object to register a listener that is executed on a task completion. The listener makes sure that a given StateStore has all state changes either committed or aborted.
In the end, FlatMapGroupsWithStateExec
creates a new StateStoreRDD and adds it to the RDD lineage.
StateStoreRDD
is used to properly distribute tasks across executors (per preferred locations) with help of StateStoreCoordinator (that runs on the driver).
StateStoreRDD
uses StateStore
helper to look up a StateStore by StateStoreProviderId and store version.
FlatMapGroupsWithStateExec
physical operator uses state managers that are different than state managers for Streaming Aggregation. StateStore abstraction is the same as in Streaming Aggregation.
One of the important execution steps is when InputProcessor
(of FlatMapGroupsWithStateExec physical operator) is requested to callFunctionAndUpdateState. That executes the user-defined state function on a per-group state key object, value objects, and a GroupStateImpl.