Aggregator Expressions¶
Aggregator is an abstraction of typed user-defined aggregate functions (user-defined typed aggregations or UDAFs).
abstract class Aggregator[-IN, BUF, OUT]
Aggregator is a Serializable (Java).
Aggregator is registered using udaf standard function.
Contract¶
bufferEncoder¶
bufferEncoder: Encoder[BUF]
Used when:
Aggregatoris requested to toColumnUserDefinedAggregatoris requested to scalaAggregator
finish¶
finish(
reduction: BUF): OUT
Used when:
ComplexTypedAggregateExpressionis requested toevalScalaAggregatoris requested to eval
merge¶
merge(
b1: BUF,
b2: BUF): BUF
Used when:
ComplexTypedAggregateExpressionis requested tomergeScalaAggregatoris requested to merge
outputEncoder¶
outputEncoder: Encoder[OUT]
Used when:
Aggregatoris requested to toColumnScalaAggregatoris requested for the outputEncoder
reduce¶
reduce(
b: BUF,
a: IN): BUF
Used when:
ComplexTypedAggregateExpressionis requested toupdateScalaAggregatoris requested to update
zero¶
zero: BUF
Used when:
SimpleTypedAggregateExpressionis requested forinitialValuesComplexTypedAggregateExpressionis requested tocreateAggregationBufferScalaAggregatoris requested to createAggregationBuffer
Implementations¶
ReduceAggregatorTypedAverageTypedCountTypedSumDoubleTypedSumLong
Converting to TypedColumn¶
toColumn: TypedColumn[IN, OUT]
toColumn converts the Aggregator to a TypedColumn (that can be used with Dataset.select and KeyValueGroupedDataset.agg typed operators).
Demo¶
// From Spark MLlib's org.apache.spark.ml.recommendation.ALSModel
// Step 1. Create Aggregator
val topKAggregator: Aggregator[Int, Int, Float] = ???
val recs = ratings
.as[(Int, Int, Float)]
.groupByKey(_._1)
.agg(topKAggregator.toColumn) // <-- use the custom Aggregator
.toDF("id", "recommendations")
Use org.apache.spark.sql.expressions.scalalang.typed object to access the type-safe aggregate functions (i.e. avg, count, sum and sumLong).
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_._1).agg(typed.sum(_._2))
ds.select(typed.sum((i: Int) => i))