Skip to content

Adaptive Query Execution (AQE)

Adaptive Query Execution (aka Adaptive Query Optimization, Adaptive Optimization, or AQE in short) is an optimization of a physical query execution plan in the middle of query execution for alternative execution plans at runtime.

Adaptive Query Execution can only be used for queries with exchanges or sub-queries (as they use Exchange physical operators that allow for extra data statistics available only when these queries are executed and process data).

Adaptive Query Execution re-optimizes the query plan based on runtime statistics.

Adaptive Query Execution is enabled by default based on spark.sql.adaptive.enabled configuration property (since Spark 3.2 and SPARK-33679).

Quoting the description of a talk by the authors of Adaptive Query Execution:

At runtime, the adaptive execution mode can change shuffle join to broadcast join if it finds the size of one table is less than the broadcast threshold. It can also handle skewed input data for join and change the partition number of the next stage to better fit the data scale. In general, adaptive execution decreases the effort involved in tuning SQL query parameters and improves the execution performance by choosing a better execution plan and parallelism at runtime.

InsertAdaptiveSparkPlan Physical Optimization

Adaptive Query Execution is possible (and applied to a physical query plan) using the InsertAdaptiveSparkPlan physical optimization that inserts AdaptiveSparkPlanExec physical operators.

AQE Logical Optimizer

Adaptive Query Execution uses AQEOptimizer logical optimizer to re-optimize logical plans.

AQE Cost Evaluator

Adaptive Query Execution uses CostEvaluator to evaluate cost when considering a candidate for an Adaptively-Optimized Physical Query Plan.

If a SparkPlan change happens, AdaptiveSparkPlanExec prints out the following message to the logs:

Plan changed from [currentPhysicalPlan] to [newPhysicalPlan]

Adaptive Query Execution uses spark.sql.adaptive.customCostEvaluatorClass configuration property or defaults to SimpleCostEvaluator.

AQE QueryStage Physical Preparation Rules

Adaptive Query Execution uses QueryStage Physical Preparation Rules that can be extended using SparkSessionExtensions.

SparkListenerSQLAdaptiveExecutionUpdates

Adaptive Query Execution notifies Spark listeners about a physical plan change using SparkListenerSQLAdaptiveExecutionUpdate and SparkListenerSQLAdaptiveSQLMetricUpdates events.

Logging

Adaptive Query Execution uses logOnLevel to print out diagnostic messages to the log.

Demo

Demo: Adaptive Query Execution

Unsupported

CacheManager

Adaptive Query Execution can change number of shuffle partitions and CacheManager makes sure that this configuration is disabled (for to cacheQuery and recacheByCondition)

Structured Streaming

Adaptive Query Execution can change number of shuffle partitions and so is not supported for streaming queries (Spark Structured Streaming).

References

Videos