Aggregator Expressions¶
Aggregator
is an abstraction of typed user-defined aggregate functions (user-defined typed aggregations or UDAFs).
abstract class Aggregator[-IN, BUF, OUT]
Aggregator
is a Serializable
(Java).
Aggregator
is registered using udaf standard function.
Contract¶
bufferEncoder¶
bufferEncoder: Encoder[BUF]
Used when:
Aggregator
is requested to toColumnUserDefinedAggregator
is requested to scalaAggregator
finish¶
finish(
reduction: BUF): OUT
Used when:
ComplexTypedAggregateExpression
is requested toeval
ScalaAggregator
is requested to eval
merge¶
merge(
b1: BUF,
b2: BUF): BUF
Used when:
ComplexTypedAggregateExpression
is requested tomerge
ScalaAggregator
is requested to merge
outputEncoder¶
outputEncoder: Encoder[OUT]
Used when:
Aggregator
is requested to toColumnScalaAggregator
is requested for the outputEncoder
reduce¶
reduce(
b: BUF,
a: IN): BUF
Used when:
ComplexTypedAggregateExpression
is requested toupdate
ScalaAggregator
is requested to update
zero¶
zero: BUF
Used when:
SimpleTypedAggregateExpression
is requested forinitialValues
ComplexTypedAggregateExpression
is requested tocreateAggregationBuffer
ScalaAggregator
is requested to createAggregationBuffer
Implementations¶
ReduceAggregator
TypedAverage
TypedCount
TypedSumDouble
TypedSumLong
Converting to TypedColumn¶
toColumn: TypedColumn[IN, OUT]
toColumn
converts the Aggregator
to a TypedColumn (that can be used with Dataset.select and KeyValueGroupedDataset.agg typed operators).
Demo¶
// From Spark MLlib's org.apache.spark.ml.recommendation.ALSModel
// Step 1. Create Aggregator
val topKAggregator: Aggregator[Int, Int, Float] = ???
val recs = ratings
.as[(Int, Int, Float)]
.groupByKey(_._1)
.agg(topKAggregator.toColumn) // <-- use the custom Aggregator
.toDF("id", "recommendations")
Use org.apache.spark.sql.expressions.scalalang.typed
object to access the type-safe aggregate functions (i.e. avg
, count
, sum
and sumLong
).
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_._1).agg(typed.sum(_._2))
ds.select(typed.sum((i: Int) => i))