Skip to content

AdaptiveSparkPlanExec Leaf Physical Operator

AdaptiveSparkPlanExec is a leaf physical operator for Adaptive Query Execution.

Creating Instance

AdaptiveSparkPlanExec takes the following to be created:

AdaptiveSparkPlanExec is created when:

Input Physical Plan

AdaptiveSparkPlanExec is given a SparkPlan when created.

The SparkPlan is determined when PlanAdaptiveDynamicPruningFilters adaptive physical optimization is executed and can be one of the following:

The SparkPlan is used for the following:

AdaptiveExecutionContext

AdaptiveSparkPlanExec is given an AdaptiveExecutionContext when created.

Adaptive Logical Optimizer

optimizer: AQEOptimizer

AdaptiveSparkPlanExec creates an AQEOptimizer (when created) that is used when requested to re-optimize a logical query plan.

AQE Cost Evaluator

costEvaluator: CostEvaluator

AdaptiveSparkPlanExec creates a CostEvaluator (when created) based on spark.sql.adaptive.customCostEvaluatorClass configuration property.

Unless configured, AdaptiveSparkPlanExec uses SimpleCostEvaluator (with spark.sql.adaptive.forceOptimizeSkewedJoin configuration property).

AdaptiveSparkPlanExec uses the CostEvaluator to evaluate cost (of a candidate for a new SparkPlan) when requested for the adaptively-optimized physical query plan.

Preprocessing Physical Optimizations

preprocessingRules: Seq[Rule[SparkPlan]]

AdaptiveSparkPlanExec is given a collection of Rules to pre-process SparkPlans (before the QueryStage Physical Preparation Rules) when executing physical optimizations to reOptimize a logical query plan.

The rules is just the single physical optimization:

Adaptive Query Stage Physical Preparation Rules

queryStagePreparationRules: Seq[Rule[SparkPlan]]

AdaptiveSparkPlanExec creates a collection of physical preparation rules (Rule[SparkPlan]s) when created (in the order):

  1. RemoveRedundantProjects
  2. EnsureRequirements (based on the requiredDistribution)
  3. AdjustShuffleExchangePosition
  4. ValidateSparkPlan
  5. ReplaceHashWithSortAgg
  6. RemoveRedundantSorts
  7. DisableUnnecessaryBucketedScan
  8. OptimizeSkewedJoin (with the EnsureRequirements)
  9. queryStagePrepRules

queryStagePreparationRules is used for the initial plan and reOptimize.

Distribution Requirement

requiredDistribution: Option[Distribution]

AdaptiveSparkPlanExec creates requiredDistribution value when created:

requiredDistribution is used for the following:

Adaptive Query Stage Physical Optimizations

queryStageOptimizerRules: Seq[Rule[SparkPlan]]

AdaptiveSparkPlanExec creates a collection of physical optimization rules (Rule[SparkPlan]s) when created (in the order):

  1. PlanAdaptiveDynamicPruningFilters
  2. ReuseAdaptiveSubquery
  3. OptimizeSkewInRebalancePartitions
  4. CoalesceShufflePartitions
  5. OptimizeShuffleWithLocalRead

queryStageOptimizerRules is used to optimizeQueryStage.

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.


doExecute takes the final physical plan to execute it (that generates an RDD[InternalRow] that will be the return value).

doExecute triggers finalPlanUpdate (unless done already).

doExecuteColumnar

doExecuteColumnar(): RDD[ColumnarBatch]

doExecuteColumnar is part of the SparkPlan abstraction.


doExecuteColumnar withFinalPlanUpdate to executeColumnar.

doExecuteBroadcast

doExecuteBroadcast[T](): broadcast.Broadcast[T]

doExecuteBroadcast is part of the SparkPlan abstraction.


doExecuteBroadcast withFinalPlanUpdate to doExecuteBroadcast.

doExecuteBroadcast asserts that the final physical plan is a BroadcastQueryStageExec.

Specialized Execution Paths

collect

executeCollect(): Array[InternalRow]

executeCollect is part of the SparkPlan abstraction.


executeCollect...FIXME

tail

executeTail(
  n: Int): Array[InternalRow]

executeTail is part of the SparkPlan abstraction.


executeTail...FIXME

take

executeTake(
  n: Int): Array[InternalRow]

executeTake is part of the SparkPlan abstraction.


executeTake...FIXME

Adaptively-Optimized Physical Query Plan

getFinalPhysicalPlan(): SparkPlan

Note

getFinalPhysicalPlan uses the isFinalPlan internal flag (and an optimized physical query plan) to short-circuit (skip) the whole expensive computation.

getFinalPhysicalPlan is used when:

Step 1. createQueryStages

getFinalPhysicalPlan createQueryStages with the currentPhysicalPlan.

Step 2. Until allChildStagesMaterialized

getFinalPhysicalPlan executes the following until allChildStagesMaterialized.

Step 2.1 New QueryStageExecs

getFinalPhysicalPlan does the following when there are new stages to be processed:

  • FIXME

Step 2.2 StageMaterializationEvents

getFinalPhysicalPlan executes the following until allChildStagesMaterialized:

  • FIXME

Step 2.3 Errors

In case of errors, getFinalPhysicalPlan cleanUpAndThrowException.

Step 2.4 replaceWithQueryStagesInLogicalPlan

getFinalPhysicalPlan replaceWithQueryStagesInLogicalPlan with the currentLogicalPlan and the stagesToReplace.

Step 2.5 reOptimize

getFinalPhysicalPlan reOptimize the new logical plan.

Step 2.6 Evaluating Cost

getFinalPhysicalPlan requests the SimpleCostEvaluator to evaluateCost of the currentPhysicalPlan and the new newPhysicalPlan.

Step 2.7 Adopting New Physical Plan

getFinalPhysicalPlan adopts the new plan if the cost is less than the currentPhysicalPlan or the costs are equal but the physical plans are different (likely better).

getFinalPhysicalPlan prints out the following message to the logs (using the logOnLevel):

Plan changed from [currentPhysicalPlan] to [newPhysicalPlan]

getFinalPhysicalPlan cleanUpTempTags with the newPhysicalPlan.

getFinalPhysicalPlan saves the newPhysicalPlan as the currentPhysicalPlan (alongside the currentLogicalPlan with the newLogicalPlan).

getFinalPhysicalPlan resets the stagesToReplace.

Step 2.8 createQueryStages

getFinalPhysicalPlan createQueryStages for the currentPhysicalPlan (that may have changed).

Step 3. applyPhysicalRules

getFinalPhysicalPlan applyPhysicalRules on the final plan (with the finalStageOptimizerRules, the planChangeLogger and AQE Final Query Stage Optimization name).

getFinalPhysicalPlan turns the isFinalPlan internal flag on.

finalStageOptimizerRules

finalStageOptimizerRules: Seq[Rule[SparkPlan]]

finalStageOptimizerRules...FIXME

createQueryStages

createQueryStages(
  plan: SparkPlan): CreateStageResult

createQueryStages checks if the given SparkPlan is one of the following:

  1. Exchange unary physical operator
  2. QueryStageExec leaf physical operator
  3. Others

The most interesting case is when the given SparkPlan is an Exchange unary physical operator.

reuseQueryStage

reuseQueryStage(
  existing: QueryStageExec,
  exchange: Exchange): QueryStageExec

reuseQueryStage requests the given QueryStageExec to newReuseInstance (with the currentStageId).

reuseQueryStage increments the currentStageId.

reuseQueryStage setLogicalLinkForNewQueryStage.

Creating QueryStageExec for Exchange

newQueryStage(
  e: Exchange): QueryStageExec

newQueryStage creates a new QueryStageExec physical operator based on the type of the given Exchange physical operator.

Exchange QueryStageExec
ShuffleExchangeLike ShuffleQueryStageExec
BroadcastExchangeLike BroadcastQueryStageExec

newQueryStage creates an optimized physical query plan for the child physical plan of the given Exchange. newQueryStage uses the adaptive optimizations, the PlanChangeLogger and AQE Query Stage Optimization batch name.

newQueryStage creates a new QueryStageExec physical operator for the given Exchange operator (using the currentStageId for the ID).

After applyPhysicalRules for the child operator, newQueryStage creates an optimized physical query plan for the Exchange itself (with the new optimized physical query plan for the child). newQueryStage uses the post-stage-creation optimizations, the PlanChangeLogger and AQE Post Stage Creation batch name.

newQueryStage increments the currentStageId counter.

newQueryStage associates the new query stage operator with the Exchange physical operator.

In the end, newQueryStage returns the QueryStageExec physical operator.

Adaptively-Optimized Physical Query Plan

currentPhysicalPlan: SparkPlan

AdaptiveSparkPlanExec defines currentPhysicalPlan variable for an adaptively-optimized SparkPlan.

currentPhysicalPlan is the initialPlan when AdaptiveSparkPlanExec is created.

currentPhysicalPlan can only change in getFinalPhysicalPlan and only until the isFinalPlan internal flag is enabled.

While getFinalPhysicalPlan, AdaptiveSparkPlanExec uses currentPhysicalPlan as an input argument for createQueryStages that gives a candidate for a new currentPhysicalPlan. AdaptiveSparkPlanExec replaces currentPhysicalPlan when the costEvaluator determines the following:

  • A smaller Cost
  • A different SparkPlan (than the currentPhysicalPlan)

If a SparkPlan switch happens, AdaptiveSparkPlanExec prints out the following message to the logs:

Plan changed from [currentPhysicalPlan] to [newPhysicalPlan]

AdaptiveSparkPlanExec applyPhysicalRules to optimize the new SparkPlan.

currentPhysicalPlan is available using executedPlan.

executedPlan

executedPlan: SparkPlan

executedPlan returns the current physical query plan.


executedPlan is used when:

Resetting Metrics

resetMetrics(): Unit

resetMetrics is part of the SparkPlan abstraction.


resetMetrics requests all the metrics to reset.

In the end, resetMetrics requests the executed query plan to resetMetrics.

optimizeQueryStage

optimizeQueryStage(
  plan: SparkPlan,
  isFinalStage: Boolean): SparkPlan

isFinalStage

The given isFinalStage can be as follows:

optimizeQueryStage executes (applies) the queryStageOptimizerRules to the given SparkPlan. While applying optimizations (executing rules), optimizeQueryStage requests the PlanChangeLogger to log plan changes (by a rule) with the name of the rule that has just been executed.

AQEShuffleReadRule

optimizeQueryStage is sensitive to AQEShuffleReadRule physical optimization and does a validation so it does not break distribution requirement of the query plan.

optimizeQueryStage requests the PlanChangeLogger to log plan changes by the entire rule batch with the following batch name:

AQE Query Stage Optimization

optimizeQueryStage is used when:

Post-Stage-Creation Adaptive Optimizations

postStageCreationRules: Seq[Rule[SparkPlan]]

postStageCreationRules is the following adaptive optimizations (physical optimization rules):

postStageCreationRules is used when:

Generating Text Representation

generateTreeString(
  depth: Int,
  lastChildren: Seq[Boolean],
  append: String => Unit,
  verbose: Boolean,
  prefix: String = "",
  addSuffix: Boolean = false,
  maxFields: Int,
  printNodeId: Boolean): Unit

generateTreeString is part of the TreeNode abstraction.


generateTreeString...FIXME

cleanUpAndThrowException

cleanUpAndThrowException(
  errors: Seq[Throwable],
  earlyFailedStage: Option[Int]): Unit

cleanUpAndThrowException...FIXME

cleanUpAndThrowException is used when AdaptiveSparkPlanExec physical operator is requested to getFinalPhysicalPlan (and materialization of new stages fails).

Replanning Logical Query Plan

reOptimize(
  logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan)

reOptimize returns a newly-optimized physical query plan with a newly-optimized logical query plan for the given logical query plan.


reOptimize requests the given LogicalPlan to invalidate statistics cache.

reOptimize requests the Adaptive Logical Optimizer to execute (and generate an optimized logical query plan).

reOptimize requests the Spark Query Planner (bound to the AdaptiveExecutionContext) to plan the optimized logical query plan (and generate a physical query plan).

reOptimize executes physical optimizations using preprocessing and preparation rules (and generates an optimized physical query plan).


reOptimize is used when:

QueryStageCreator Thread Pool

executionContext: ExecutionContext

executionContext is an ExecutionContext that is used when:

finalPlanUpdate Lazy Value

finalPlanUpdate: Unit
lazy value

finalPlanUpdate is a Scala lazy value which is computed once when accessed and cached afterwards.

finalPlanUpdate...FIXME

In the end, finalPlanUpdate prints out the following message to the logs:

Final plan: [currentPhysicalPlan]

finalPlanUpdate is used when AdaptiveSparkPlanExec physical operator is requested to executeCollect, executeTake, executeTail and doExecute.

isFinalPlan (Available Already)

isFinalPlan: Boolean

isFinalPlan is an internal flag that is used to skip (short-circuit) the expensive process of producing an adaptively-optimized physical query plan (and immediately return the one that has already been prepared).

isFinalPlan is disabled (false) when AdaptiveSparkPlanExec is created. It is enabled right after an adaptively-optimized physical query plan has once been prepared.

isFinalPlan is also used for reporting when AdaptiveSparkPlanExec is requested for the following:

String Arguments

stringArgs: Iterator[Any]

stringArgs is part of the TreeNode abstraction.


stringArgs is the following (with the isFinalPlan flag):

isFinalPlan=[isFinalPlan]

Initial Plan

initialPlan: SparkPlan

AdaptiveSparkPlanExec initializes initialPlan value when created.

initialPlan is a SparkPlan after applying the queryStagePreparationRules to the inputPlan (with the planChangeLogger and AQE Preparations batch name).

initialPlan is the currentPhysicalPlan when AdaptiveSparkPlanExec is created.

isFinalPlan is used when:

replaceWithQueryStagesInLogicalPlan

replaceWithQueryStagesInLogicalPlan(
  plan: LogicalPlan,
  stagesToReplace: Seq[QueryStageExec]): LogicalPlan

replaceWithQueryStagesInLogicalPlan...FIXME

replaceWithQueryStagesInLogicalPlan is used when AdaptiveSparkPlanExec physical operator is requested for a final physical plan.

Executing AQE Query Post Planner Strategy Rules

applyQueryPostPlannerStrategyRules(
  plan: SparkPlan): SparkPlan

applyQueryPostPlannerStrategyRules runs the queryPostPlannerStrategyRules on the given SparkPlan.

applyQueryPostPlannerStrategyRules uses the planChangeLogger and the batch name as follows:

AQE Query Post Planner Strategy Rules

applyQueryPostPlannerStrategyRules is used when:

Executing Physical Optimizations

applyPhysicalRules(
  plan: SparkPlan,
  rules: Seq[Rule[SparkPlan]],
  loggerAndBatchName: Option[(PlanChangeLogger[SparkPlan], String)] = None): SparkPlan

By default (with no loggerAndBatchName given) applyPhysicalRules applies (executes) the given rules to the given physical query plan.

With loggerAndBatchName specified, applyPhysicalRules executes the rules and, for every rule, requests the PlanChangeLogger to logRule. In the end, applyPhysicalRules requests the PlanChangeLogger to logBatch.


applyPhysicalRules is used when:

withFinalPlanUpdate

withFinalPlanUpdate[T](
  fun: SparkPlan => T): T

withFinalPlanUpdate executes the given fun with the adaptively-optimized physical query plan and returns the result (of type T).

In the meantime, withFinalPlanUpdate finalPlanUpdate.


withFinalPlanUpdate is a helper method for AdaptiveSparkPlanExec when requested for the following:

finalPhysicalPlan

finalPhysicalPlan: SparkPlan

finalPhysicalPlan is withFinalPlanUpdate with identity function (which boils down to getFinalPhysicalPlan alone).


finalPhysicalPlan is used when:

Logging

Enable ALL logging level for org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec logger to see what happens inside.

Add the following line to conf/log4j2.properties:

log4j.logger.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec=ALL

Refer to Logging.

PlanChangeLogger

AdaptiveSparkPlanExec uses a PlanChangeLogger for the following:

Operation Batch Name
Executing AQE Query Post Planner Strategy Rules AQE Query Post Planner Strategy Rules
Initial Plan AQE Preparations
Adaptively-Optimized Physical Query Plan AQE Post Stage Creation
Creating QueryStageExec for Exchange AQE Post Stage Creation
Optimizing QueryStage AQE Query Stage Optimization
Replanning Logical Query Plan AQE Replanning

logOnLevel

logOnLevel: (=> String) => Unit

logOnLevel uses the internal spark.sql.adaptive.logLevel configuration property for the logging level and prints out the given message to the logs (at the log level).

logOnLevel is used when: