Demo: ObjectHashAggregateExec and Sort-Based Fallback Tasks¶
This demo shows when ObjectHashAggregateExec physical operator falls back to sort-based aggregation (that Spark SQL hoped to avoid while planning an aggregation).
ObjectHashAggregateExec
physical operator uses spark.sql.objectHashAggregate.sortBased.fallbackThreshold configuration property to control when to switch to sort-based aggregation.
Configure SparkSession¶
./bin/spark-shell
import org.apache.spark.sql.internal.SQLConf
assert(SQLConf.get.objectAggSortBasedFallbackThreshold == 128)
spark.sessionState.conf.setConf(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD, 1)
assert(SQLConf.get.objectAggSortBasedFallbackThreshold == 1)
No Sort Fallback Tasks¶
collect_set standard function (a TypedImperativeAggregate expression) is one of the built-in standard functions that are planned for execution using ObjectHashAggregateExec physical operator.
The following query over a single-row dataset produces one group (so it is under spark.sql.objectHashAggregate.sortBased.fallbackThreshold
of 1
) and hence there will be no sort fallback tasks.
val oneRowDataset = Seq(
(0, 0)
).toDF("id", "gid")
val q = oneRowDataset
.groupBy("gid")
.agg(collect_set("id") as "ids")
q.write.format("noop").mode("overwrite").save
Sort Fallback Tasks¶
When an aggregation happens to use more than one group (and crosses spark.sql.objectHashAggregate.sortBased.fallbackThreshold
), there will be as many sort fallback tasks as there were partitions with enough groups above the threshold.
val threeRowDataset = Seq(
(0, 0),
(1, 1),
(2, 1),
).toDF("id", "gid")
val q = threeRowDataset
.groupBy("gid")
.agg(collect_set("id") as "ids")
q.write.format("noop").mode("overwrite").save
Sort Fallback Tasks After Repartition¶
val q = threeRowDataset
.coalesce(1) // all rows in one partition
.groupBy("gid")
.agg(collect_set("id") as "ids")
q.write.format("noop").mode("overwrite").save