Skip to content

AdaptiveSparkPlanExec Leaf Physical Operator

AdaptiveSparkPlanExec is a leaf physical operator for Adaptive Query Execution.

Creating Instance

AdaptiveSparkPlanExec takes the following to be created:

AdaptiveSparkPlanExec is created when:

Input Physical Plan

AdaptiveSparkPlanExec is given a SparkPlan when created.

The SparkPlan is determined when PlanAdaptiveDynamicPruningFilters adaptive physical optimization is executed and can be one of the following:

The SparkPlan is used for the following:

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute takes the final physical plan to execute it (that generates an RDD[InternalRow] that will be the return value).

doExecute triggers finalPlanUpdate (unless done already).

doExecute is part of the SparkPlan abstraction.

doExecuteColumnar

doExecuteColumnar(): RDD[ColumnarBatch]

doExecuteColumnar withFinalPlanUpdate to executeColumnar (that generates a RDD of ColumnarBatchs to be returned at the end).

doExecuteColumnar is part of the SparkPlan abstraction.

doExecuteBroadcast

doExecuteBroadcast[T](): broadcast.Broadcast[T]

doExecuteBroadcast withFinalPlanUpdate to doExecuteBroadcast (that generates a Broadcast variable to be returned at the end).

doExecuteBroadcast asserts that the final physical plan is a BroadcastQueryStageExec.

doExecuteBroadcast is part of the SparkPlan abstraction.

Specialized Execution Paths

collect

executeCollect(): Array[InternalRow]

executeCollect...FIXME

executeCollect is part of the SparkPlan abstraction.

tail

executeTail(
  n: Int): Array[InternalRow]

executeTail...FIXME

executeTail is part of the SparkPlan abstraction.

take

executeTake(
  n: Int): Array[InternalRow]

executeTake...FIXME

executeTake is part of the SparkPlan abstraction.

Final Physical Query Plan

getFinalPhysicalPlan(): SparkPlan

Note

getFinalPhysicalPlan uses the isFinalPlan internal flag (and an optimized physical query plan) to short-circuit (skip) the whole expensive computation.

getFinalPhysicalPlan is used when:

Step 1. createQueryStages

getFinalPhysicalPlan createQueryStages with the currentPhysicalPlan.

Step 2. Until allChildStagesMaterialized

getFinalPhysicalPlan executes the following until allChildStagesMaterialized.

Step 2.1 New QueryStageExecs

getFinalPhysicalPlan does the following when there are new stages to be processed:

  • FIXME

Step 2.2 StageMaterializationEvents

getFinalPhysicalPlan executes the following until allChildStagesMaterialized:

  • FIXME

Step 2.3 Errors

In case of errors, getFinalPhysicalPlan cleanUpAndThrowException.

Step 2.4 replaceWithQueryStagesInLogicalPlan

getFinalPhysicalPlan replaceWithQueryStagesInLogicalPlan with the currentLogicalPlan and the stagesToReplace.

Step 2.5 reOptimize

getFinalPhysicalPlan reOptimize the new logical plan.

Step 2.6 Evaluating Cost

getFinalPhysicalPlan requests the SimpleCostEvaluator to evaluateCost of the currentPhysicalPlan and the new newPhysicalPlan.

Step 2.7 Adopting New Physical Plan

getFinalPhysicalPlan adopts the new plan if the cost is less than the currentPhysicalPlan or the costs are equal but the physical plans are different (likely better).

getFinalPhysicalPlan prints out the following message to the logs (using the logOnLevel):

Plan changed from [currentPhysicalPlan] to [newPhysicalPlan]

getFinalPhysicalPlan cleanUpTempTags with the newPhysicalPlan.

getFinalPhysicalPlan saves the newPhysicalPlan as the currentPhysicalPlan (alongside the currentLogicalPlan with the newLogicalPlan).

getFinalPhysicalPlan resets the stagesToReplace.

Step 2.8 createQueryStages

getFinalPhysicalPlan createQueryStages for the currentPhysicalPlan (that may have changed).

Step 3. applyPhysicalRules

getFinalPhysicalPlan applyPhysicalRules on the final plan (with the finalStageOptimizerRules, the planChangeLogger and AQE Final Query Stage Optimization name).

getFinalPhysicalPlan turns the isFinalPlan internal flag on.

finalStageOptimizerRules

finalStageOptimizerRules: Seq[Rule[SparkPlan]]

finalStageOptimizerRules...FIXME

createQueryStages

createQueryStages(
  plan: SparkPlan): CreateStageResult

createQueryStages checks if the given SparkPlan is one of the following:

  1. Exchange unary physical operator
  2. QueryStageExec leaf physical operator
  3. Others

The most interesting case is when the given SparkPlan is an Exchange unary physical operator.

reuseQueryStage

reuseQueryStage(
  existing: QueryStageExec,
  exchange: Exchange): QueryStageExec

reuseQueryStage requests the given QueryStageExec to newReuseInstance (with the currentStageId).

reuseQueryStage increments the currentStageId.

reuseQueryStage setLogicalLinkForNewQueryStage.

Creating QueryStageExec for Exchange

newQueryStage(
  e: Exchange): QueryStageExec

newQueryStage creates a new QueryStageExec physical operator based on the type of the given Exchange physical operator.

Exchange QueryStageExec
ShuffleExchangeLike ShuffleQueryStageExec
BroadcastExchangeLike BroadcastQueryStageExec

newQueryStage creates an optimized physical query plan for the child physical plan of the given Exchange. newQueryStage uses the adaptive optimizations, the PlanChangeLogger and AQE Query Stage Optimization batch name.

newQueryStage creates a new QueryStageExec physical operator for the given Exchange operator (using the currentStageId for the ID).

After applyPhysicalRules for the child operator, newQueryStage creates an optimized physical query plan for the Exchange itself (with the new optimized physical query plan for the child). newQueryStage uses the post-stage-creation optimizations, the PlanChangeLogger and AQE Post Stage Creation batch name.

newQueryStage increments the currentStageId counter.

newQueryStage associates the new query stage operator with the Exchange physical operator.

In the end, newQueryStage returns the QueryStageExec physical operator.

Optimized Physical Query Plan

AdaptiveSparkPlanExec uses currentPhysicalPlan internal registry for an optimized physical query plan (that is available as executedPlan method).

Initially, when AdaptiveSparkPlanExec operator is created, currentPhysicalPlan is the initialPlan.

currentPhysicalPlan may change in getFinalPhysicalPlan until the isFinalPlan internal flag is on.

QueryStage Preparation Rules

queryStagePreparationRules: Seq[Rule[SparkPlan]]

queryStagePreparationRules is a single-rule collection of EnsureRequirements physical optimization.

queryStagePreparationRules is used when AdaptiveSparkPlanExec operator is requested for the current physical plan and reOptimize.

Adaptive Optimizations

queryStageOptimizerRules: Seq[Rule[SparkPlan]]

queryStageOptimizerRules is the following adaptive optimizations (physical optimization rules):

queryStageOptimizerRules is used when:

Post-Stage-Creation Adaptive Optimizations

postStageCreationRules: Seq[Rule[SparkPlan]]

postStageCreationRules is the following adaptive optimizations (physical optimization rules):

postStageCreationRules is used when:

Text Representation

generateTreeString(
  depth: Int,
  lastChildren: Seq[Boolean],
  append: String => Unit,
  verbose: Boolean,
  prefix: String = "",
  addSuffix: Boolean = false,
  maxFields: Int,
  printNodeId: Boolean): Unit

generateTreeString...FIXME

generateTreeString is part of the TreeNode abstraction.

cleanUpAndThrowException

cleanUpAndThrowException(
  errors: Seq[Throwable],
  earlyFailedStage: Option[Int]): Unit

cleanUpAndThrowException...FIXME

cleanUpAndThrowException is used when AdaptiveSparkPlanExec physical operator is requested to getFinalPhysicalPlan (and materialization of new stages fails).

Re-Optimizing Logical Query Plan

reOptimize(
  logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan)

reOptimize gives optimized physical and logical query plans for the given logical query plan.

Internally, reOptimize requests the given logical query plan to invalidateStatsCache and requests the local logical optimizer to generate an optimized logical query plan.

reOptimize requests the query planner (bound to the AdaptiveExecutionContext) to plan the optimized logical query plan (and generate a physical query plan).

reOptimize creates an optimized physical query plan using preprocessing and preparation rules.

reOptimize is used when AdaptiveSparkPlanExec physical operator is requested to getFinalPhysicalPlan (and materialization of new stages fails).

Adaptive Logical Optimizer

optimizer: AQEOptimizer

AdaptiveSparkPlanExec creates an AQEOptimizer (while created) for re-optimizing a logical query plan.

QueryStageCreator Thread Pool

executionContext: ExecutionContext

executionContext is an ExecutionContext that is used when:

finalPlanUpdate Lazy Value

finalPlanUpdate: Unit
lazy value

finalPlanUpdate is a Scala lazy value which is computed once when accessed and cached afterwards.

finalPlanUpdate...FIXME

In the end, finalPlanUpdate prints out the following message to the logs:

Final plan: [currentPhysicalPlan]

finalPlanUpdate is used when AdaptiveSparkPlanExec physical operator is requested to executeCollect, executeTake, executeTail and doExecute.

isFinalPlan Flag

isFinalPlan: Boolean

isFinalPlan is an internal flag to avoid expensive getFinalPhysicalPlan (and return the current optimized physical query plan immediately).

isFinalPlan is off (false) by default. It is turned on at the end of getFinalPhysicalPlan.

isFinalPlan is also used when:

Initial Plan

initialPlan: SparkPlan

AdaptiveSparkPlanExec initializes initialPlan value when created.

initialPlan is a SparkPlan after applying the queryStagePreparationRules to the inputPlan (with the planChangeLogger and AQE Preparations batch name).

initialPlan is the currentPhysicalPlan when AdaptiveSparkPlanExec is created.

isFinalPlan is used when:

replaceWithQueryStagesInLogicalPlan

replaceWithQueryStagesInLogicalPlan(
  plan: LogicalPlan,
  stagesToReplace: Seq[QueryStageExec]): LogicalPlan

replaceWithQueryStagesInLogicalPlan...FIXME

replaceWithQueryStagesInLogicalPlan is used when AdaptiveSparkPlanExec physical operator is requested for a final physical plan.

Executing Physical Rules

applyPhysicalRules(
  plan: SparkPlan,
  rules: Seq[Rule[SparkPlan]],
  loggerAndBatchName: Option[(PlanChangeLogger[SparkPlan], String)] = None): SparkPlan

By default (with no loggerAndBatchName given) applyPhysicalRules applies (executes) the given rules to the given physical query plan.

With loggerAndBatchName specified, applyPhysicalRules executes the rules and, for every rule, requests the PlanChangeLogger to logRule. In the end, applyPhysicalRules requests the PlanChangeLogger to logBatch.

applyPhysicalRules is used when:

withFinalPlanUpdate

withFinalPlanUpdate[T](
  fun: SparkPlan => T): T

withFinalPlanUpdate executes the given fun with the final physical plan and returns the result (of type T). In the end, withFinalPlanUpdate finalPlanUpdate.

withFinalPlanUpdate is used when:

Logging

Enable ALL logging level for org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec=ALL

Refer to Logging.

PlanChangeLogger

AdaptiveSparkPlanExec uses a PlanChangeLogger for the following:

logOnLevel

logOnLevel: (=> String) => Unit

logOnLevel uses the internal spark.sql.adaptive.logLevel configuration property for the logging level and prints out the given message to the logs (at the log level).

logOnLevel is used when:

Back to top