Skip to content

Demo: ObjectHashAggregateExec and Sort-Based Fallback Tasks

This demo shows when ObjectHashAggregateExec physical operator falls back to sort-based aggregation (that Spark SQL hoped to avoid while planning an aggregation).

ObjectHashAggregateExec physical operator uses spark.sql.objectHashAggregate.sortBased.fallbackThreshold configuration property to control when to switch to sort-based aggregation.

Configure SparkSession

./bin/spark-shell
import org.apache.spark.sql.internal.SQLConf
assert(SQLConf.get.objectAggSortBasedFallbackThreshold == 128)
spark.sessionState.conf.setConf(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD, 1)
assert(SQLConf.get.objectAggSortBasedFallbackThreshold == 1)

No Sort Fallback Tasks

collect_set standard function (a TypedImperativeAggregate expression) is one of the built-in standard functions that are planned for execution using ObjectHashAggregateExec physical operator.

The following query over a single-row dataset produces one group (so it is under spark.sql.objectHashAggregate.sortBased.fallbackThreshold of 1) and hence there will be no sort fallback tasks.

val oneRowDataset = Seq(
    (0, 0)
  ).toDF("id", "gid")
val q = oneRowDataset
  .groupBy("gid")
  .agg(collect_set("id") as "ids")
q.write.format("noop").mode("overwrite").save

ObjectHashAggregateExec and No Sort Fallback Tasks

Sort Fallback Tasks

When an aggregation happens to use more than one group (and crosses spark.sql.objectHashAggregate.sortBased.fallbackThreshold), there will be as many sort fallback tasks as there were partitions with enough groups above the threshold.

val threeRowDataset = Seq(
    (0, 0),
    (1, 1),
    (2, 1),
  ).toDF("id", "gid")
val q = threeRowDataset
  .groupBy("gid")
  .agg(collect_set("id") as "ids")
q.write.format("noop").mode("overwrite").save

ObjectHashAggregateExec and Sort Fallback Tasks

Sort Fallback Tasks After Repartition

val q = threeRowDataset
  .coalesce(1) // all rows in one partition
  .groupBy("gid")
  .agg(collect_set("id") as "ids")
q.write.format("noop").mode("overwrite").save

ObjectHashAggregateExec and Sort Fallback Tasks (after Repartition)