Adaptive Query Execution (AQE)¶
Adaptive Query Execution (aka Adaptive Query Optimization, Adaptive Optimization, or AQE in short) is an optimization of a physical query execution plan in the middle of query execution for alternative execution plans at runtime.
Adaptive Query Execution can only be used for queries with exchanges or sub-queries (as they use Exchange physical operators that allow for extra data statistics available only when these queries are executed and process data).
Adaptive Query Execution re-optimizes the query plan based on runtime statistics.
Adaptive Query Execution is enabled by default based on spark.sql.adaptive.enabled configuration property (since Spark 3.2 and SPARK-33679).
Quoting the description of a talk by the authors of Adaptive Query Execution:
At runtime, the adaptive execution mode can change shuffle join to broadcast join if it finds the size of one table is less than the broadcast threshold. It can also handle skewed input data for join and change the partition number of the next stage to better fit the data scale. In general, adaptive execution decreases the effort involved in tuning SQL query parameters and improves the execution performance by choosing a better execution plan and parallelism at runtime.
InsertAdaptiveSparkPlan Physical Optimization¶
Adaptive Query Execution is possible (and applied to a physical query plan) using the InsertAdaptiveSparkPlan physical optimization that inserts AdaptiveSparkPlanExec physical operators.
AQE Logical Optimizer¶
Adaptive Query Execution uses AQEOptimizer logical optimizer to re-optimize logical plans.
AQE Cost Evaluator¶
Adaptive Query Execution uses CostEvaluator to evaluate cost when considering a candidate for an Adaptively-Optimized Physical Query Plan.
If a SparkPlan
change happens, AdaptiveSparkPlanExec
prints out the following message to the logs:
Plan changed from [currentPhysicalPlan] to [newPhysicalPlan]
Adaptive Query Execution uses spark.sql.adaptive.customCostEvaluatorClass configuration property or defaults to SimpleCostEvaluator.
AQE QueryStage Physical Preparation Rules¶
Adaptive Query Execution uses QueryStage Physical Preparation Rules that can be extended using SparkSessionExtensions.
SparkListenerSQLAdaptiveExecutionUpdates¶
Adaptive Query Execution notifies Spark listeners about a physical plan change using SparkListenerSQLAdaptiveExecutionUpdate
and SparkListenerSQLAdaptiveSQLMetricUpdates
events.
Logging¶
Adaptive Query Execution uses logOnLevel to print out diagnostic messages to the log.
Demo¶
Demo: Adaptive Query Execution
Unsupported¶
CacheManager¶
Adaptive Query Execution can change number of shuffle partitions and CacheManager makes sure that this configuration is disabled (for to cacheQuery and recacheByCondition)
Structured Streaming¶
Adaptive Query Execution can change number of shuffle partitions and so is not supported for streaming queries (Spark Structured Streaming).