Skip to content

KeyValueGroupedDataset

KeyValueGroupedDataset is an interface for Typed Grouping to calculate aggregates over groups of objects in a typed Dataset.

KeyValueGroupedDataset represents a grouped dataset as a result of Dataset.groupByKey operator (that aggregates records by a grouping function).

// Dataset[T]
groupByKey(
  func: T => K): KeyValueGroupedDataset[K, T]

KeyValueGroupedDataset works for batch and streaming aggregations.

RelationalGroupedDataset

RelationalGroupedDataset is used for untyped Row-based aggregates.

Creating Instance

KeyValueGroupedDataset takes the following to be created:

KeyValueGroupedDataset is created for the following high-level operators:

flatMapGroupsWithState

flatMapGroupsWithState[S: Encoder, U: Encoder](
  outputMode: OutputMode,
  timeoutConf: GroupStateTimeout)(
  func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U]
flatMapGroupsWithState[S: Encoder, U: Encoder](
  outputMode: OutputMode,
  timeoutConf: GroupStateTimeout,
  initialState: KeyValueGroupedDataset[K, S])(
  func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U]

flatMapGroupsWithState creates a Dataset with a FlatMapGroupsWithState logical operator (with the isMapGroupsWithState disabled).

flatMapGroupsWithState accepts Append and Update output modes only, and throws an IllegalArgumentException for the others:

The output mode of function should be append or update

Spark Structured Streaming

KeyValueGroupedDataset can be used in streaming queries in Spark Structured Streaming:

Demo

import java.sql.Timestamp
val numGroups = spark.
  readStream.
  format("rate").
  load.
  as[(Timestamp, Long)].
  groupByKey { case (time, value) => value % 2 }

scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]