Skip to content

KeyValueGroupedDataset

KeyValueGroupedDataset is a high-level API for Typed Grouping (part of Basic Aggregation) to calculate aggregates over groups of objects in a typed Dataset.

KeyValueGroupedDataset represents a grouped dataset as a result of Dataset.groupByKey operator (that aggregates records by a grouping function).

// Dataset[T]
groupByKey(
  func: T => K): KeyValueGroupedDataset[K, T]

KeyValueGroupedDataset works for batch and streaming aggregations.

RelationalGroupedDataset

RelationalGroupedDataset is used for untyped Row-based aggregates.

Creating Instance

KeyValueGroupedDataset takes the following to be created:

KeyValueGroupedDataset is created for the following high-level operators:

agg

agg[U1](
  col1: TypedColumn[V, U1]): Dataset[(K, U1)]
agg[U1, U2](
  col1: TypedColumn[V, U1],
  col2: TypedColumn[V, U2]): Dataset[(K, U1, U2)]
...
agg[U1, U2, U3, U4, U5, U6, U7, U8](
  col1: TypedColumn[V, U1],
  col2: TypedColumn[V, U2],
  col3: TypedColumn[V, U3],
  col4: TypedColumn[V, U4],
  col5: TypedColumn[V, U5],
  col6: TypedColumn[V, U6],
  col7: TypedColumn[V, U7],
  col8: TypedColumn[V, U8]): Dataset[(K, U1, U2, U3, U4, U5, U6, U7, U8)]

agg aggUntyped.

aggUntyped

aggUntyped(
  columns: TypedColumn[_, _]*): Dataset[_]

aggUntyped creates a Dataset with an Aggregate logical operator (with the grouping attributes).

flatMapGroupsWithState

flatMapGroupsWithState[S: Encoder, U: Encoder](
  outputMode: OutputMode,
  timeoutConf: GroupStateTimeout)(
  func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U]
flatMapGroupsWithState[S: Encoder, U: Encoder](
  outputMode: OutputMode,
  timeoutConf: GroupStateTimeout,
  initialState: KeyValueGroupedDataset[K, S])(
  func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U]

flatMapGroupsWithState creates a Dataset with a FlatMapGroupsWithState logical operator (with the isMapGroupsWithState disabled).

flatMapGroupsWithState accepts Append and Update output modes only, and throws an IllegalArgumentException for the others:

The output mode of function should be append or update

Spark Structured Streaming

KeyValueGroupedDataset can be used in streaming queries in Spark Structured Streaming:

Demo

import java.sql.Timestamp
val numGroups = spark.
  readStream.
  format("rate").
  load.
  as[(Timestamp, Long)].
  groupByKey { case (time, value) => value % 2 }

scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]