Standard Aggregate Functions¶
any¶
any(
e: Column): Column
any
...FIXME
any_value¶
any_value(
e: Column): Column
any_value(
e: Column,
ignoreNulls: Column): Column
any_value
...FIXME
bool_and¶
bool_and(
e: Column): Column
bool_and
...FIXME
bool_or¶
bool_or(
e: Column): Column
bool_or
...FIXME
collect_set¶
collect_set(
e: Column): Column
collect_set(
columnName: String): Column
collect_set
creates a CollectSet expression (for the expr of the given Column) and requests it to toAggregateExpression.
In the end, collect_set
wraps the AggregateExpression up in a Column.
count_if¶
count_if(
e: Column): Column
count_if
...FIXME
every¶
every(
e: Column): Column
every
...FIXME
max_by¶
max_by(
e: Column,
ord: Column): Column
scala> sql("DESC FUNCTION max_by").show(truncate=false)
+----------------------------------------------------------------------------------------+
|function_desc |
+----------------------------------------------------------------------------------------+
|Function: max_by |
|Class: org.apache.spark.sql.catalyst.expressions.aggregate.MaxBy |
|Usage: max_by(x, y) - Returns the value of `x` associated with the maximum value of `y`.|
+----------------------------------------------------------------------------------------+
Learn More
Learn more in this post on LinkedIn that drew my attention to max_by
.
Demo¶
Problem: Find the most expensive vegetable
val veggies = Seq(
("broccoli", 10.15),
("cabbage", 5.25),
("arugula", 1.05),
("beetroot", 10.15),
).toDF("vegetable", "price")
scala> veggies.show()
+---------+-----+
|vegetable|price|
+---------+-----+
| broccoli|10.15|
| cabbage| 5.25|
| arugula| 1.05|
| beetroot|10.15|
+---------+-----+
val the_most_expensive_veggie_q = veggies.select(max_by($"vegetable", $"price"))
scala> the_most_expensive_veggie_q.show()
+------------------------+
|max_by(vegetable, price)|
+------------------------+
| beetroot|
+------------------------+
Closing questions:
- Is the answer correct?
- How is this different from
groupBy
orrank
?
Internals¶
Internally, max_by
creates a MaxBy aggregate function that is then wrapped into a Column (as an AggregateExpression).
import org.apache.spark.sql.functions.max_by
val c = max_by($"id", $"ord")
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
val ae = c.expr.asInstanceOf[AggregateExpression]
import org.apache.spark.sql.catalyst.expressions.aggregate.MaxBy
val maxByFn = ae.aggregateFunction.asInstanceOf[MaxBy]
scala> print(maxByFn.numberedTreeString)
00 max_by('id, 'ord)
01 :- 'id
02 +- 'ord
scala> q.explain(extended=true)
== Parsed Logical Plan ==
'Project [unresolvedalias(max_by('x, 'y), Some(org.apache.spark.sql.Column$$Lambda$2839/0x0000000129e57160@5e3ea926))]
+- Project [_1#14 AS x#19, _2#15 AS y#20]
+- LocalRelation [_1#14, _2#15]
== Analyzed Logical Plan ==
max_by(x, y): string
Aggregate [max_by(x#19, y#20) AS max_by(x, y)#97]
+- Project [_1#14 AS x#19, _2#15 AS y#20]
+- LocalRelation [_1#14, _2#15]
== Optimized Logical Plan ==
Aggregate [max_by(x#19, y#20) AS max_by(x, y)#97]
+- LocalRelation [x#19, y#20]
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[], functions=[max_by(x#19, y#20)], output=[max_by(x, y)#97])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=86]
+- SortAggregate(key=[], functions=[partial_max_by(x#19, y#20)], output=[valueWithExtremumOrdering#101, extremumOrdering#102])
+- LocalTableScan [x#19, y#20]
some¶
some(
e: Column): Column
some
...FIXME