Skip to content

Aggregator Expressions

Aggregator is an abstraction of typed user-defined aggregate functions (user-defined typed aggregations or UDAFs).

abstract class Aggregator[-IN, BUF, OUT]

Aggregator is a Serializable (Java).

Aggregator is registered using udaf standard function.

Contract

bufferEncoder

bufferEncoder: Encoder[BUF]

Used when:

finish

finish(
  reduction: BUF): OUT

Used when:

  • ComplexTypedAggregateExpression is requested to eval
  • ScalaAggregator is requested to eval

merge

merge(
  b1: BUF,
  b2: BUF): BUF

Used when:

  • ComplexTypedAggregateExpression is requested to merge
  • ScalaAggregator is requested to merge

outputEncoder

outputEncoder: Encoder[OUT]

Used when:

reduce

reduce(
  b: BUF,
  a: IN): BUF

Used when:

  • ComplexTypedAggregateExpression is requested to update
  • ScalaAggregator is requested to update

zero

zero: BUF

Used when:

  • SimpleTypedAggregateExpression is requested for initialValues
  • ComplexTypedAggregateExpression is requested to createAggregationBuffer
  • ScalaAggregator is requested to createAggregationBuffer

Implementations

  • ReduceAggregator
  • TypedAverage
  • TypedCount
  • TypedSumDouble
  • TypedSumLong

Converting to TypedColumn

toColumn: TypedColumn[IN, OUT]

toColumn converts the Aggregator to a TypedColumn (that can be used with Dataset.select and KeyValueGroupedDataset.agg typed operators).

Demo

// From Spark MLlib's org.apache.spark.ml.recommendation.ALSModel
// Step 1. Create Aggregator
val topKAggregator: Aggregator[Int, Int, Float] = ???
val recs = ratings
  .as[(Int, Int, Float)]
  .groupByKey(_._1)
  .agg(topKAggregator.toColumn) // <-- use the custom Aggregator
  .toDF("id", "recommendations")

Use org.apache.spark.sql.expressions.scalalang.typed object to access the type-safe aggregate functions (i.e. avg, count, sum and sumLong).

import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_._1).agg(typed.sum(_._2))
ds.select(typed.sum((i: Int) => i))