Skip to content

Standard Aggregate Functions

any

any(
  e: Column): Column

any...FIXME

any_value

any_value(
  e: Column): Column
any_value(
  e: Column,
  ignoreNulls: Column): Column

any_value...FIXME

bool_and

bool_and(
  e: Column): Column

bool_and...FIXME

bool_or

bool_or(
  e: Column): Column

bool_or...FIXME

collect_set

collect_set(
  e: Column): Column
collect_set(
  columnName: String): Column

collect_set creates a CollectSet expression (for the expr of the given Column) and requests it to toAggregateExpression.

In the end, collect_set wraps the AggregateExpression up in a Column.

count_if

count_if(
  e: Column): Column

count_if...FIXME

every

every(
  e: Column): Column

every...FIXME

max_by

max_by(
  e: Column,
  ord: Column): Column

scala> sql("DESC FUNCTION max_by").show(truncate=false)
+----------------------------------------------------------------------------------------+
|function_desc                                                                           |
+----------------------------------------------------------------------------------------+
|Function: max_by                                                                        |
|Class: org.apache.spark.sql.catalyst.expressions.aggregate.MaxBy                        |
|Usage: max_by(x, y) - Returns the value of `x` associated with the maximum value of `y`.|
+----------------------------------------------------------------------------------------+
Learn More

Learn more in this post on LinkedIn that drew my attention to max_by.

Demo

Problem: Find the most expensive vegetable

val veggies = Seq(
    ("broccoli", 10.15),
    ("cabbage", 5.25),
    ("arugula", 1.05),
    ("beetroot", 10.15),
  ).toDF("vegetable", "price")
scala> veggies.show()
+---------+-----+
|vegetable|price|
+---------+-----+
| broccoli|10.15|
|  cabbage| 5.25|
|  arugula| 1.05|
| beetroot|10.15|
+---------+-----+
val the_most_expensive_veggie_q = veggies.select(max_by($"vegetable", $"price"))
scala> the_most_expensive_veggie_q.show()
+------------------------+
|max_by(vegetable, price)|
+------------------------+
|                beetroot|
+------------------------+

Closing questions:

  1. Is the answer correct?
  2. How is this different from groupBy or rank?

Internals

Internally, max_by creates a MaxBy aggregate function that is then wrapped into a Column (as an AggregateExpression).

import org.apache.spark.sql.functions.max_by
val c = max_by($"id", $"ord")
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
val ae = c.expr.asInstanceOf[AggregateExpression]
import org.apache.spark.sql.catalyst.expressions.aggregate.MaxBy
val maxByFn = ae.aggregateFunction.asInstanceOf[MaxBy]
scala> print(maxByFn.numberedTreeString)
00 max_by('id, 'ord)
01 :- 'id
02 +- 'ord
scala> q.explain(extended=true)
== Parsed Logical Plan ==
'Project [unresolvedalias(max_by('x, 'y), Some(org.apache.spark.sql.Column$$Lambda$2839/0x0000000129e57160@5e3ea926))]
+- Project [_1#14 AS x#19, _2#15 AS y#20]
   +- LocalRelation [_1#14, _2#15]

== Analyzed Logical Plan ==
max_by(x, y): string
Aggregate [max_by(x#19, y#20) AS max_by(x, y)#97]
+- Project [_1#14 AS x#19, _2#15 AS y#20]
   +- LocalRelation [_1#14, _2#15]

== Optimized Logical Plan ==
Aggregate [max_by(x#19, y#20) AS max_by(x, y)#97]
+- LocalRelation [x#19, y#20]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[], functions=[max_by(x#19, y#20)], output=[max_by(x, y)#97])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=86]
      +- SortAggregate(key=[], functions=[partial_max_by(x#19, y#20)], output=[valueWithExtremumOrdering#101, extremumOrdering#102])
         +- LocalTableScan [x#19, y#20]

some

some(
  e: Column): Column

some...FIXME