Demo: Adaptive Query Execution¶
This demo shows the internals of Adaptive Query Execution.
Before you begin¶
Enable the following loggers:
Query¶
Create a table.
sql("DROP TABLE IF EXISTS adaptive")
sql("CREATE TABLE adaptive USING parquet AS SELECT * FROM VALUES (1)")
Create a query with an Exchange so Adaptive Query Execution can have a chance to step up (otherwise spark.sql.adaptive.forceApply would be required).
val q = spark.table("adaptive").repartition(2)
q.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=58]
+- FileScan parquet default.adaptive[col1#14] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col1:int>
Note the value of the isFinalPlan flag that is false
.
Access AdaptiveSparkPlan¶
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
val adaptiveExec = q.queryExecution.executedPlan.collectFirst { case op: AdaptiveSparkPlanExec => op }.get
assert(adaptiveExec.isInstanceOf[AdaptiveSparkPlanExec])
println(adaptiveExec)
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=58]
+- FileScan parquet default.adaptive[col1#14] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col1:int>
Execute AdaptiveSparkPlan¶
Execute the query that in turn executes AdaptiveSparkPlanExec physical operator (and marks it as adaptively optimized using isFinalPlan
flag).
val rdd = adaptiveExec.execute()
ShuffleQueryStageExec
and AQEOptimizer
loggers should print out the following messages to the logs:
DEBUG ShuffleQueryStageExec: Materialize query stage ShuffleQueryStageExec: 0
TRACE AQEOptimizer: Fixed point reached for batch Propagate Empty Relations after 1 iterations.
TRACE AQEOptimizer: Fixed point reached for batch Dynamic Join Selection after 1 iterations.
TRACE AQEOptimizer: Fixed point reached for batch Eliminate Limits after 1 iterations.
TRACE AQEOptimizer: Fixed point reached for batch Optimize One Row Plan after 1 iterations.
println(rdd.toDebugString)
(2) ShuffledRowRDD[5] at execute at <console>:1 []
+-(2) MapPartitionsRDD[4] at execute at <console>:1 []
| MapPartitionsRDD[3] at execute at <console>:1 []
| MapPartitionsRDD[2] at execute at <console>:1 []
| MapPartitionsRDD[1] at execute at <console>:1 []
| FileScanRDD[0] at execute at <console>:1 []
Alternatively, you could use one of the high-level operators (e.g. tail
).
q.tail(1)
Explain Query¶
After execution, AdaptiveSparkPlanExec
is final (and will never get re-optimized).
println(adaptiveExec)
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
ShuffleQueryStage 0
+- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=15]
+- *(1) ColumnarToRow
+- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- == Initial Plan ==
Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=6]
+- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
Note the value of the isFinalPlan flag that is true
.
q.explain()
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
ShuffleQueryStage 0
+- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=15]
+- *(1) ColumnarToRow
+- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- == Initial Plan ==
Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=6]
+- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
Internals¶
AdaptiveSparkPlanExec is a leaf physical operator. That is why the following snippet gives a single physical operator in the optimized physical query plan.
q.queryExecution.executedPlan.foreach(op => println(op.getClass.getName))
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
Let's access the underlying AdaptiveSparkPlanExec
and the inputPlan and initialPlan physical query plans.
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
val adaptiveExec = q.queryExecution.executedPlan.collectFirst { case op: AdaptiveSparkPlanExec => op }.get
assert(adaptiveExec.isInstanceOf[AdaptiveSparkPlanExec])
val inputPlan = adaptiveExec.inputPlan
val initialPlan = adaptiveExec.initialPlan
Before execution, AdaptiveSparkPlanExec
should not be final.
println(adaptiveExec.numberedTreeString)
00 AdaptiveSparkPlan isFinalPlan=false
01 +- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=6]
02 +- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
val rdd = adaptiveExec.execute()
DEBUG ShuffleQueryStageExec: Materialize query stage ShuffleQueryStageExec: 0
TRACE AQEOptimizer: Fixed point reached for batch Propagate Empty Relations after 1 iterations.
TRACE AQEOptimizer: Fixed point reached for batch Dynamic Join Selection after 1 iterations.
TRACE AQEOptimizer: Fixed point reached for batch Eliminate Limits after 1 iterations.
TRACE AQEOptimizer: Fixed point reached for batch Optimize One Row Plan after 1 iterations.
println(rdd.toDebugString)
(2) ShuffledRowRDD[5] at execute at <console>:1 []
+-(2) MapPartitionsRDD[4] at execute at <console>:1 []
| MapPartitionsRDD[3] at execute at <console>:1 []
| MapPartitionsRDD[2] at execute at <console>:1 []
| MapPartitionsRDD[1] at execute at <console>:1 []
| FileScanRDD[0] at execute at <console>:1 []
Once executed, AdaptiveSparkPlanExec
should be final.
println(adaptiveExec.numberedTreeString)
00 AdaptiveSparkPlan isFinalPlan=true
01 +- == Final Plan ==
02 ShuffleQueryStage 0
03 +- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=15]
04 +- *(1) ColumnarToRow
05 +- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
06 +- == Initial Plan ==
07 Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [plan_id=6]
08 +- FileScan parquet default.adaptive[id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jacek/dev/oss/spark/spark-warehouse/adaptive], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
Note
There seems no way to dig deeper and access QueryStageExecs though. Feeling sad
Future Work¶
- Use
SparkListenerSQLAdaptiveExecutionUpdate
andSparkListenerSQLAdaptiveSQLMetricUpdates
to intercept changes in query plans - Enable
ALL
for AdaptiveSparkPlanExec logger with spark.sql.adaptive.logLevel toTRACE