Skip to content

Demo: Adaptive Query Execution

This demo shows the internals of Adaptive Query Execution.

Before you begin

Enable the following loggers:

Query

Create a table.

sql("DROP TABLE IF EXISTS adaptive")
sql("CREATE TABLE adaptive USING parquet AS SELECT * FROM VALUES (1)")

Create a query with an Exchange so Adaptive Query Execution can have a chance to step up (otherwise spark.sql.adaptive.forceApply would be required).

val q = spark.table("adaptive").repartition(2)
q.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=58]
   +- FileScan parquet default.adaptive[col1#14] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col1:int>

Note the value of the isFinalPlan flag that is false.

Access AdaptiveSparkPlan

import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec

val adaptiveExec = q.queryExecution.executedPlan.collectFirst { case op: AdaptiveSparkPlanExec => op }.get
assert(adaptiveExec.isInstanceOf[AdaptiveSparkPlanExec])
println(adaptiveExec)
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=58]
   +- FileScan parquet default.adaptive[col1#14] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col1:int>

Execute AdaptiveSparkPlan

Execute the query that in turn executes AdaptiveSparkPlanExec physical operator (and marks it as adaptively optimized using isFinalPlan flag).

val rdd = adaptiveExec.execute()

ShuffleQueryStageExec and AQEOptimizer loggers should print out the following messages to the logs:

DEBUG ShuffleQueryStageExec: Materialize query stage ShuffleQueryStageExec: 0
TRACE AQEOptimizer: Fixed point reached for batch Propagate Empty Relations after 1 iterations.
TRACE AQEOptimizer: Fixed point reached for batch Dynamic Join Selection after 1 iterations.
TRACE AQEOptimizer: Fixed point reached for batch Eliminate Limits after 1 iterations.
TRACE AQEOptimizer: Fixed point reached for batch Optimize One Row Plan after 1 iterations.
println(rdd.toDebugString)
(2) ShuffledRowRDD[5] at execute at <console>:1 []
 +-(2) MapPartitionsRDD[4] at execute at <console>:1 []
    |  MapPartitionsRDD[3] at execute at <console>:1 []
    |  MapPartitionsRDD[2] at execute at <console>:1 []
    |  MapPartitionsRDD[1] at execute at <console>:1 []
    |  FileScanRDD[0] at execute at <console>:1 []

Alternatively, you could use one of the high-level operators (e.g. tail).

q.tail(1)

Explain Query

After execution, AdaptiveSparkPlanExec is final (and will never get re-optimized).

println(adaptiveExec)
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ShuffleQueryStage 0
   +- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=15]
      +- *(1) ColumnarToRow
         +- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- == Initial Plan ==
   Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=6]
   +- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

Note the value of the isFinalPlan flag that is true.

q.explain()
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ShuffleQueryStage 0
   +- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=15]
      +- *(1) ColumnarToRow
         +- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- == Initial Plan ==
   Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=6]
   +- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

Internals

AdaptiveSparkPlanExec is a leaf physical operator. That is why the following snippet gives a single physical operator in the optimized physical query plan.

q.queryExecution.executedPlan.foreach(op => println(op.getClass.getName))
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec

Let's access the underlying AdaptiveSparkPlanExec and the inputPlan and initialPlan physical query plans.

import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec

val adaptiveExec = q.queryExecution.executedPlan.collectFirst { case op: AdaptiveSparkPlanExec => op }.get
assert(adaptiveExec.isInstanceOf[AdaptiveSparkPlanExec])

val inputPlan = adaptiveExec.inputPlan
val initialPlan = adaptiveExec.initialPlan

Before execution, AdaptiveSparkPlanExec should not be final.

println(adaptiveExec.numberedTreeString)
00 AdaptiveSparkPlan isFinalPlan=false
01 +- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=6]
02    +- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
val rdd = adaptiveExec.execute()
DEBUG ShuffleQueryStageExec: Materialize query stage ShuffleQueryStageExec: 0
TRACE AQEOptimizer: Fixed point reached for batch Propagate Empty Relations after 1 iterations.
TRACE AQEOptimizer: Fixed point reached for batch Dynamic Join Selection after 1 iterations.
TRACE AQEOptimizer: Fixed point reached for batch Eliminate Limits after 1 iterations.
TRACE AQEOptimizer: Fixed point reached for batch Optimize One Row Plan after 1 iterations.
println(rdd.toDebugString)
(2) ShuffledRowRDD[5] at execute at <console>:1 []
 +-(2) MapPartitionsRDD[4] at execute at <console>:1 []
    |  MapPartitionsRDD[3] at execute at <console>:1 []
    |  MapPartitionsRDD[2] at execute at <console>:1 []
    |  MapPartitionsRDD[1] at execute at <console>:1 []
    |  FileScanRDD[0] at execute at <console>:1 []

Once executed, AdaptiveSparkPlanExec should be final.

println(adaptiveExec.numberedTreeString)
00 AdaptiveSparkPlan isFinalPlan=true
01 +- == Final Plan ==
02    ShuffleQueryStage 0
03    +- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=15]
04       +- *(1) ColumnarToRow
05          +- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
06 +- == Initial Plan ==
07    Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=6]
08    +- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

Note

There seems no way to dig deeper and access QueryStageExecs though. Feeling sad

Future Work