Skip to content

Basic Aggregation

Basic Aggregation calculates aggregates over a group of rows in a Dataset using aggregate operators (possibly with aggregate functions).

Aggregate Operators

agg

Aggregates over (applies an aggregate function on) a subset of or the entire Dataset (i.e., considering the entire data set as one group)

Creates a RelationalGroupedDataset

Note

Dataset.agg is simply a shortcut for Dataset.groupBy().agg.

groupBy

Groups the rows in a Dataset by columns (as Column expressions or names).

Creates a RelationalGroupedDataset

Used for untyped aggregates using DataFrames. Grouping is described using column expressions or column names.

Internally, groupBy resolves column names and creates a RelationalGroupedDataset (with groupType as GroupByType).

groupByKey

Groups records (of type T) by the input func and creates a KeyValueGroupedDataset to apply aggregation to.

Used for typed aggregates using Datasets with records grouped by a key-defining discriminator function

import org.apache.spark.sql.expressions.scalalang._
val q = dataset
  .groupByKey(_.productId).
  .agg(typed.sum[Token](_.score))
  .toDF("productId", "sum")
  .orderBy('productId)
spark
  .readStream
  .format("rate")
  .load
  .as[(Timestamp, Long)]
  .groupByKey { case (ts, v) => v % 2 }
  .agg()
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime(5.seconds))
  .outputMode("complete")
  .start