Aggregator Expressions¶
Aggregator
is an abstraction of typed user-defined aggregate functions (user-defined typed aggregations or UDAFs).
abstract class Aggregator[-IN, BUF, OUT]
Aggregator
is a Serializable
(Java).
Contract¶
bufferEncoder¶
bufferEncoder: Encoder[BUF]
Used when:
Aggregator
is requested to toColumnUserDefinedAggregator
is requested toscalaAggregator
finish¶
finish(
reduction: BUF): OUT
Used when:
ComplexTypedAggregateExpression
is requested toeval
ScalaAggregator
is requested toeval
merge¶
merge(
b1: BUF,
b2: BUF): BUF
Used when:
ComplexTypedAggregateExpression
is requested tomerge
ScalaAggregator
is requested tomerge
outputEncoder¶
outputEncoder: Encoder[OUT]
Used when:
ScalaAggregator
is requested for theoutputEncoder
Aggregator
is requested to toColumn
reduce¶
reduce(
b: BUF,
a: IN): BUF
Used when:
ComplexTypedAggregateExpression
is requested toupdate
ScalaAggregator
is requested toupdate
zero¶
zero: BUF
Used when:
SimpleTypedAggregateExpression
is requested forinitialValues
ComplexTypedAggregateExpression
is requested tocreateAggregationBuffer
ScalaAggregator
is requested tocreateAggregationBuffer
Implementations¶
- ReduceAggregator
- TypedAverage
- TypedCount
- TypedSumDouble
- TypedSumLong
udaf Standard Function¶
udaf
standard function is used to register an Aggregator
(create an UserDefinedFunction
that wraps the given Aggregator
so that it may be used with untyped Data Frames).
udaf[IN: TypeTag, BUF, OUT](
agg: Aggregator[IN, BUF, OUT]): UserDefinedFunction
udaf[IN, BUF, OUT](
agg: Aggregator[IN, BUF, OUT],
inputEncoder: Encoder[IN]): UserDefinedFunction
Converting to TypedColumn¶
toColumn: TypedColumn[IN, OUT]
toColumn
converts the Aggregator
to a TypedColumn (that can be used with Dataset.select and KeyValueGroupedDataset.agg typed operators).
Demo¶
// From Spark MLlib's org.apache.spark.ml.recommendation.ALSModel
// Step 1. Create Aggregator
val topKAggregator: Aggregator[Int, Int, Float] = ???
val recs = ratings
.as[(Int, Int, Float)]
.groupByKey(_._1)
.agg(topKAggregator.toColumn) // <-- use the custom Aggregator
.toDF("id", "recommendations")
Use org.apache.spark.sql.expressions.scalalang.typed
object to access the type-safe aggregate functions, i.e. avg
, count
, sum
and sumLong
.
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_._1).agg(typed.sum(_._2))
ds.select(typed.sum((i: Int) => i))