AdaptiveSparkPlanExec Leaf Physical Operator¶
AdaptiveSparkPlanExec is a leaf physical operator for Adaptive Query Execution.
Creating Instance¶
AdaptiveSparkPlanExec takes the following to be created:
- Input Physical Plan
- AdaptiveExecutionContext
- Preprocessing Physical Optimizations
-
isSubqueryflag -
supportsColumnarflag (default:false)
AdaptiveSparkPlanExec is created when:
- InsertAdaptiveSparkPlan physical optimization is executed
Input Physical Plan¶
AdaptiveSparkPlanExec is given a SparkPlan when created.
The SparkPlan is determined when PlanAdaptiveDynamicPruningFilters adaptive physical optimization is executed and can be one of the following:
- BroadcastExchangeExec physical operator (with spark.sql.exchange.reuse configuration property enabled)
- Planned Aggregate logical operator (otherwise)
The SparkPlan is used for the following:
- requiredDistribution, initialPlan, output, doCanonicalize, getFinalPhysicalPlan, hashCode and equals
AdaptiveExecutionContext¶
AdaptiveSparkPlanExec is given an AdaptiveExecutionContext when created.
Adaptive Logical Optimizer¶
optimizer: AQEOptimizer
AdaptiveSparkPlanExec creates an AQEOptimizer (when created) that is used when requested to re-optimize a logical query plan.
AQE Cost Evaluator¶
costEvaluator: CostEvaluator
AdaptiveSparkPlanExec creates a CostEvaluator (when created) based on spark.sql.adaptive.customCostEvaluatorClass configuration property.
Unless configured, AdaptiveSparkPlanExec uses SimpleCostEvaluator (with spark.sql.adaptive.forceOptimizeSkewedJoin configuration property).
AdaptiveSparkPlanExec uses the CostEvaluator to evaluate cost (of a candidate for a new SparkPlan) when requested for the adaptively-optimized physical query plan.
Preprocessing Physical Optimizations¶
preprocessingRules: Seq[Rule[SparkPlan]]
AdaptiveSparkPlanExec is given a collection of Rules to pre-process SparkPlans (before the QueryStage Physical Preparation Rules) when executing physical optimizations to reOptimize a logical query plan.
The rules is just the single physical optimization:
Adaptive Query Stage Physical Preparation Rules¶
queryStagePreparationRules: Seq[Rule[SparkPlan]]
AdaptiveSparkPlanExec creates a collection of physical preparation rules (Rule[SparkPlan]s) when created (in the order):
- RemoveRedundantProjects
- EnsureRequirements (based on the requiredDistribution)
- AdjustShuffleExchangePosition
- ValidateSparkPlan
- ReplaceHashWithSortAgg
- RemoveRedundantSorts
- DisableUnnecessaryBucketedScan
- OptimizeSkewedJoin (with the EnsureRequirements)
- queryStagePrepRules
queryStagePreparationRules is used for the initial plan and reOptimize.
Distribution Requirement¶
requiredDistribution: Option[Distribution]
AdaptiveSparkPlanExec creates requiredDistribution value when created:
UnspecifiedDistributionfor a subquery (as a subquery output does not need a specific output partitioning)- AQEUtils.getRequiredDistribution for the inputPlan otherwise
requiredDistribution is used for the following:
- queryStagePreparationRules (to create EnsureRequirements physical optimization)
- optimizeQueryStage
Adaptive Query Stage Physical Optimizations¶
queryStageOptimizerRules: Seq[Rule[SparkPlan]]
AdaptiveSparkPlanExec creates a collection of physical optimization rules (Rule[SparkPlan]s) when created (in the order):
- PlanAdaptiveDynamicPruningFilters
- ReuseAdaptiveSubquery
- OptimizeSkewInRebalancePartitions
- CoalesceShufflePartitions
- OptimizeShuffleWithLocalRead
queryStageOptimizerRules is used to optimizeQueryStage.
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute is part of the SparkPlan abstraction.
doExecute takes the final physical plan to execute it (that generates an RDD[InternalRow] that will be the return value).
doExecute triggers finalPlanUpdate (unless done already).
doExecuteColumnar¶
doExecuteColumnar(): RDD[ColumnarBatch]
doExecuteColumnar is part of the SparkPlan abstraction.
doExecuteColumnar withFinalPlanUpdate to executeColumnar.
doExecuteBroadcast¶
doExecuteBroadcast[T](): broadcast.Broadcast[T]
doExecuteBroadcast is part of the SparkPlan abstraction.
doExecuteBroadcast withFinalPlanUpdate to doExecuteBroadcast.
doExecuteBroadcast asserts that the final physical plan is a BroadcastQueryStageExec.
Specialized Execution Paths¶
collect¶
executeCollect(): Array[InternalRow]
executeCollect is part of the SparkPlan abstraction.
executeCollect...FIXME
tail¶
executeTail(
n: Int): Array[InternalRow]
executeTail is part of the SparkPlan abstraction.
executeTail...FIXME
take¶
executeTake(
n: Int): Array[InternalRow]
executeTake is part of the SparkPlan abstraction.
executeTake...FIXME
Adaptively-Optimized Physical Query Plan¶
getFinalPhysicalPlan(): SparkPlan
Note
getFinalPhysicalPlan uses the isFinalPlan internal flag (and an optimized physical query plan) to short-circuit (skip) the whole expensive computation.
getFinalPhysicalPlan is used when:
AdaptiveSparkPlanExecphysical operator is requested to withFinalPlanUpdate
Step 1. createQueryStages¶
getFinalPhysicalPlan createQueryStages with the currentPhysicalPlan.
Step 2. Until allChildStagesMaterialized¶
getFinalPhysicalPlan executes the following until allChildStagesMaterialized.
Step 2.1 New QueryStageExecs¶
getFinalPhysicalPlan does the following when there are new stages to be processed:
- FIXME
Step 2.2 StageMaterializationEvents¶
getFinalPhysicalPlan executes the following until allChildStagesMaterialized:
- FIXME
Step 2.3 Errors¶
In case of errors, getFinalPhysicalPlan cleanUpAndThrowException.
Step 2.4 replaceWithQueryStagesInLogicalPlan¶
getFinalPhysicalPlan replaceWithQueryStagesInLogicalPlan with the currentLogicalPlan and the stagesToReplace.
Step 2.5 reOptimize¶
getFinalPhysicalPlan reOptimize the new logical plan.
Step 2.6 Evaluating Cost¶
getFinalPhysicalPlan requests the SimpleCostEvaluator to evaluateCost of the currentPhysicalPlan and the new newPhysicalPlan.
Step 2.7 Adopting New Physical Plan¶
getFinalPhysicalPlan adopts the new plan if the cost is less than the currentPhysicalPlan or the costs are equal but the physical plans are different (likely better).
getFinalPhysicalPlan prints out the following message to the logs (using the logOnLevel):
Plan changed from [currentPhysicalPlan] to [newPhysicalPlan]
getFinalPhysicalPlan cleanUpTempTags with the newPhysicalPlan.
getFinalPhysicalPlan saves the newPhysicalPlan as the currentPhysicalPlan (alongside the currentLogicalPlan with the newLogicalPlan).
getFinalPhysicalPlan resets the stagesToReplace.
Step 2.8 createQueryStages¶
getFinalPhysicalPlan createQueryStages for the currentPhysicalPlan (that may have changed).
Step 3. applyPhysicalRules¶
getFinalPhysicalPlan applyPhysicalRules on the final plan (with the finalStageOptimizerRules, the planChangeLogger and AQE Final Query Stage Optimization name).
getFinalPhysicalPlan turns the isFinalPlan internal flag on.
finalStageOptimizerRules¶
finalStageOptimizerRules: Seq[Rule[SparkPlan]]
finalStageOptimizerRules...FIXME
createQueryStages¶
createQueryStages(
plan: SparkPlan): CreateStageResult
createQueryStages checks if the given SparkPlan is one of the following:
- Exchange unary physical operator
- QueryStageExec leaf physical operator
- Others
The most interesting case is when the given SparkPlan is an Exchange unary physical operator.
reuseQueryStage¶
reuseQueryStage(
existing: QueryStageExec,
exchange: Exchange): QueryStageExec
reuseQueryStage requests the given QueryStageExec to newReuseInstance (with the currentStageId).
reuseQueryStage increments the currentStageId.
reuseQueryStage setLogicalLinkForNewQueryStage.
Creating QueryStageExec for Exchange¶
newQueryStage(
e: Exchange): QueryStageExec
newQueryStage creates a new QueryStageExec physical operator based on the type of the given Exchange physical operator.
| Exchange | QueryStageExec |
|---|---|
| ShuffleExchangeLike | ShuffleQueryStageExec |
| BroadcastExchangeLike | BroadcastQueryStageExec |
newQueryStage creates an optimized physical query plan for the child physical plan of the given Exchange. newQueryStage uses the adaptive optimizations, the PlanChangeLogger and AQE Query Stage Optimization batch name.
newQueryStage creates a new QueryStageExec physical operator for the given Exchange operator (using the currentStageId for the ID).
After applyPhysicalRules for the child operator, newQueryStage creates an optimized physical query plan for the Exchange itself (with the new optimized physical query plan for the child). newQueryStage uses the post-stage-creation optimizations, the PlanChangeLogger and AQE Post Stage Creation batch name.
newQueryStage increments the currentStageId counter.
newQueryStage associates the new query stage operator with the Exchange physical operator.
In the end, newQueryStage returns the QueryStageExec physical operator.
Adaptively-Optimized Physical Query Plan¶
currentPhysicalPlan: SparkPlan
AdaptiveSparkPlanExec defines currentPhysicalPlan variable for an adaptively-optimized SparkPlan.
currentPhysicalPlan is the initialPlan when AdaptiveSparkPlanExec is created.
currentPhysicalPlan can only change in getFinalPhysicalPlan and only until the isFinalPlan internal flag is enabled.
While getFinalPhysicalPlan, AdaptiveSparkPlanExec uses currentPhysicalPlan as an input argument for createQueryStages that gives a candidate for a new currentPhysicalPlan. AdaptiveSparkPlanExec replaces currentPhysicalPlan when the costEvaluator determines the following:
If a SparkPlan switch happens, AdaptiveSparkPlanExec prints out the following message to the logs:
Plan changed from [currentPhysicalPlan] to [newPhysicalPlan]
AdaptiveSparkPlanExec applyPhysicalRules to optimize the new SparkPlan.
currentPhysicalPlan is available using executedPlan.
executedPlan¶
executedPlan: SparkPlan
executedPlan returns the current physical query plan.
executedPlan is used when:
ExplainUtilsis requested to generateOperatorIDs, collectOperatorsWithID, generateWholeStageCodegenIds, getSubqueries, removeTagsSparkPlanInfois requested tofromSparkPlan(forSparkListenerSQLExecutionStartandSparkListenerSQLAdaptiveExecutionUpdateevents)AdaptiveSparkPlanExecis requested to reset metricsAdaptiveSparkPlanHelperis requested toallChildrenandstripAQEPlanPlanAdaptiveDynamicPruningFiltersis executeddebugpackage object is requested tocodegenStringSeq
Resetting Metrics¶
resetMetrics(): Unit
resetMetrics is part of the SparkPlan abstraction.
resetMetrics requests all the metrics to reset.
In the end, resetMetrics requests the executed query plan to resetMetrics.
optimizeQueryStage¶
optimizeQueryStage(
plan: SparkPlan,
isFinalStage: Boolean): SparkPlan
isFinalStage
The given isFinalStage can be as follows:
truewhen requested for an adaptively-optimized physical query planfalsewhen requested to create a new QueryStageExec for an Exchange
optimizeQueryStage executes (applies) the queryStageOptimizerRules to the given SparkPlan. While applying optimizations (executing rules), optimizeQueryStage requests the PlanChangeLogger to log plan changes (by a rule) with the name of the rule that has just been executed.
AQEShuffleReadRule
optimizeQueryStage is sensitive to AQEShuffleReadRule physical optimization and does a validation so it does not break distribution requirement of the query plan.
optimizeQueryStage requests the PlanChangeLogger to log plan changes by the entire rule batch with the following batch name:
AQE Query Stage Optimization
optimizeQueryStage is used when:
AdaptiveSparkPlanExecis requested for an adaptively-optimized physical query plan and to create a new QueryStageExec for an Exchange
Post-Stage-Creation Adaptive Optimizations¶
postStageCreationRules: Seq[Rule[SparkPlan]]
postStageCreationRules is the following adaptive optimizations (physical optimization rules):
postStageCreationRules is used when:
AdaptiveSparkPlanExecis requested to finalStageOptimizerRules and newQueryStage
Generating Text Representation¶
generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean): Unit
generateTreeString is part of the TreeNode abstraction.
generateTreeString...FIXME
cleanUpAndThrowException¶
cleanUpAndThrowException(
errors: Seq[Throwable],
earlyFailedStage: Option[Int]): Unit
cleanUpAndThrowException...FIXME
cleanUpAndThrowException is used when AdaptiveSparkPlanExec physical operator is requested to getFinalPhysicalPlan (and materialization of new stages fails).
Replanning Logical Query Plan¶
reOptimize(
logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan)
reOptimize returns a newly-optimized physical query plan with a newly-optimized logical query plan for the given logical query plan.
reOptimize requests the given LogicalPlan to invalidate statistics cache.
reOptimize requests the Adaptive Logical Optimizer to execute (and generate an optimized logical query plan).
reOptimize requests the Spark Query Planner (bound to the AdaptiveExecutionContext) to plan the optimized logical query plan (and generate a physical query plan).
reOptimize executes physical optimizations using preprocessing and preparation rules (and generates an optimized physical query plan).
reOptimize is used when:
AdaptiveSparkPlanExecphysical operator is requested for an adaptively-optimized physical query plan
QueryStageCreator Thread Pool¶
executionContext: ExecutionContext
executionContext is an ExecutionContext that is used when:
-
AdaptiveSparkPlanExecoperator is requested for a getFinalPhysicalPlan (to materialize QueryStageExec operators asynchronously) -
BroadcastQueryStageExec operator is requested for materializeWithTimeout
finalPlanUpdate Lazy Value¶
finalPlanUpdate: Unit
lazy value
finalPlanUpdate is a Scala lazy value which is computed once when accessed and cached afterwards.
finalPlanUpdate...FIXME
In the end, finalPlanUpdate prints out the following message to the logs:
Final plan: [currentPhysicalPlan]
finalPlanUpdate is used when AdaptiveSparkPlanExec physical operator is requested to executeCollect, executeTake, executeTail and doExecute.
isFinalPlan (Available Already)¶
isFinalPlan: Boolean
isFinalPlan is an internal flag that is used to skip (short-circuit) the expensive process of producing an adaptively-optimized physical query plan (and immediately return the one that has already been prepared).
isFinalPlan is disabled (false) when AdaptiveSparkPlanExec is created. It is enabled right after an adaptively-optimized physical query plan has once been prepared.
isFinalPlan is also used for reporting when AdaptiveSparkPlanExec is requested for the following:
String Arguments¶
stringArgs: Iterator[Any]
stringArgs is part of the TreeNode abstraction.
stringArgs is the following (with the isFinalPlan flag):
isFinalPlan=[isFinalPlan]
Initial Plan¶
initialPlan: SparkPlan
AdaptiveSparkPlanExec initializes initialPlan value when created.
initialPlan is a SparkPlan after applying the queryStagePreparationRules to the inputPlan (with the planChangeLogger and AQE Preparations batch name).
initialPlan is the currentPhysicalPlan when AdaptiveSparkPlanExec is created.
isFinalPlan is used when:
AdaptiveSparkPlanExecis requested for a text representationExplainUtilsutility is used to process a query plan
replaceWithQueryStagesInLogicalPlan¶
replaceWithQueryStagesInLogicalPlan(
plan: LogicalPlan,
stagesToReplace: Seq[QueryStageExec]): LogicalPlan
replaceWithQueryStagesInLogicalPlan...FIXME
replaceWithQueryStagesInLogicalPlan is used when AdaptiveSparkPlanExec physical operator is requested for a final physical plan.
Executing AQE Query Post Planner Strategy Rules¶
applyQueryPostPlannerStrategyRules(
plan: SparkPlan): SparkPlan
applyQueryPostPlannerStrategyRules runs the queryPostPlannerStrategyRules on the given SparkPlan.
applyQueryPostPlannerStrategyRules uses the planChangeLogger and the batch name as follows:
AQE Query Post Planner Strategy Rules
applyQueryPostPlannerStrategyRules is used when:
AdaptiveSparkPlanExecphysical operator is requested for the initialPlan and to reOptimize
Executing Physical Optimizations¶
applyPhysicalRules(
plan: SparkPlan,
rules: Seq[Rule[SparkPlan]],
loggerAndBatchName: Option[(PlanChangeLogger[SparkPlan], String)] = None): SparkPlan
By default (with no loggerAndBatchName given) applyPhysicalRules applies (executes) the given rules to the given physical query plan.
With loggerAndBatchName specified, applyPhysicalRules executes the rules and, for every rule, requests the PlanChangeLogger to logRule. In the end, applyPhysicalRules requests the PlanChangeLogger to logBatch.
applyPhysicalRules is used when:
AdaptiveSparkPlanExecphysical operator is created (and initializes the initialPlan), is requested to getFinalPhysicalPlan, newQueryStage, reOptimize- InsertAdaptiveSparkPlan physical optimization is executed
withFinalPlanUpdate¶
withFinalPlanUpdate[T](
fun: SparkPlan => T): T
withFinalPlanUpdate executes the given fun with the adaptively-optimized physical query plan and returns the result (of type T).
In the meantime, withFinalPlanUpdate finalPlanUpdate.
withFinalPlanUpdate is a helper method for AdaptiveSparkPlanExec when requested for the following:
- doExecute
- doExecuteColumnar
- doExecuteBroadcast
- executeCollect
- executeTake
- executeTail
- finalPhysicalPlan
finalPhysicalPlan¶
finalPhysicalPlan: SparkPlan
finalPhysicalPlan is withFinalPlanUpdate with identity function (which boils down to getFinalPhysicalPlan alone).
finalPhysicalPlan is used when:
FileFormatWriteris requested to write out
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec logger to see what happens inside.
Add the following line to conf/log4j2.properties:
log4j.logger.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec=ALL
Refer to Logging.
PlanChangeLogger¶
AdaptiveSparkPlanExec uses a PlanChangeLogger for the following:
| Operation | Batch Name |
|---|---|
| Executing AQE Query Post Planner Strategy Rules | AQE Query Post Planner Strategy Rules |
| Initial Plan | AQE Preparations |
| Adaptively-Optimized Physical Query Plan | AQE Post Stage Creation |
| Creating QueryStageExec for Exchange | AQE Post Stage Creation |
| Optimizing QueryStage | AQE Query Stage Optimization |
| Replanning Logical Query Plan | AQE Replanning |
logOnLevel¶
logOnLevel: (=> String) => Unit
logOnLevel uses the internal spark.sql.adaptive.logLevel configuration property for the logging level and prints out the given message to the logs (at the log level).
logOnLevel is used when:
AdaptiveSparkPlanExecphysical operator is requested to getFinalPhysicalPlan and finalPlanUpdate