Skip to content

flatMapGroupsWithState Operator

flatMapGroupsWithState is part of KeyValueGroupedDataset (Spark SQL) API for Arbitrary Stateful Streaming Aggregation with an explicit state logic.

flatMapGroupsWithState[S: Encoder, U: Encoder](
  outputMode: OutputMode,
  timeoutConf: GroupStateTimeout)(
  func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U]
flatMapGroupsWithState[S: Encoder, U: Encoder](
  outputMode: OutputMode,
  timeoutConf: GroupStateTimeout,
  initialState: KeyValueGroupedDataset[K, S])(
  func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U] // (1)!
  1. Since 3.2.0

Input Arguments

flatMapGroupsWithState accepts the following:

  • OutputMode
  • GroupStateTimeout
  • A function with the K key and the V values and the current GroupState for the given K key
  • (optionally) KeyValueGroupedDataset for an user-defined initial state

FlatMapGroupsWithState Logical Operator

flatMapGroupsWithState creates a Dataset with FlatMapGroupsWithState logical operator with the following:

  • LogicalGroupState
  • groupingAttributes of the KeyValueGroupedDataset
  • dataAttributes of the KeyValueGroupedDataset
  • isMapGroupsWithState flag disabled (false)
  • logicalPlan of the KeyValueGroupedDataset as the child operator

Output Modes

flatMapGroupsWithState supports Append and Update output modes only and throws an IllegalArgumentException otherwise:

The output mode of function should be append or update

Note

An OutputMode is a required argument, but does not seem to be used at all. Check out the question What's the purpose of OutputMode in flatMapGroupsWithState? How/where is it used? on StackOverflow.