AdaptiveSparkPlanExec Leaf Physical Operator¶
AdaptiveSparkPlanExec
is a leaf physical operator for Adaptive Query Execution.
Creating Instance¶
AdaptiveSparkPlanExec
takes the following to be created:
- Input Physical Plan
- AdaptiveExecutionContext
- Preprocessing Physical Optimizations
-
isSubquery
flag -
supportsColumnar
flag (default:false
)
AdaptiveSparkPlanExec
is created when:
- InsertAdaptiveSparkPlan physical optimization is executed
Input Physical Plan¶
AdaptiveSparkPlanExec
is given a SparkPlan when created.
The SparkPlan
is determined when PlanAdaptiveDynamicPruningFilters adaptive physical optimization is executed and can be one of the following:
- BroadcastExchangeExec physical operator (with spark.sql.exchange.reuse configuration property enabled)
- Planned Aggregate logical operator (otherwise)
The SparkPlan
is used for the following:
- requiredDistribution, initialPlan, output, doCanonicalize, getFinalPhysicalPlan, hashCode and equals
AdaptiveExecutionContext¶
AdaptiveSparkPlanExec
is given an AdaptiveExecutionContext when created.
Adaptive Logical Optimizer¶
optimizer: AQEOptimizer
AdaptiveSparkPlanExec
creates an AQEOptimizer (when created) that is used when requested to re-optimize a logical query plan.
AQE Cost Evaluator¶
costEvaluator: CostEvaluator
AdaptiveSparkPlanExec
creates a CostEvaluator (when created) based on spark.sql.adaptive.customCostEvaluatorClass configuration property.
Unless configured, AdaptiveSparkPlanExec
uses SimpleCostEvaluator (with spark.sql.adaptive.forceOptimizeSkewedJoin configuration property).
AdaptiveSparkPlanExec
uses the CostEvaluator
to evaluate cost (of a candidate for a new SparkPlan
) when requested for the adaptively-optimized physical query plan.
Preprocessing Physical Optimizations¶
preprocessingRules: Seq[Rule[SparkPlan]]
AdaptiveSparkPlanExec
is given a collection of Rules to pre-process SparkPlans (before the QueryStage Physical Preparation Rules) when executing physical optimizations to reOptimize a logical query plan.
The rules is just the single physical optimization:
Adaptive Query Stage Physical Preparation Rules¶
queryStagePreparationRules: Seq[Rule[SparkPlan]]
AdaptiveSparkPlanExec
creates a collection of physical preparation rules (Rule[SparkPlan]
s) when created (in the order):
- RemoveRedundantProjects
- EnsureRequirements (based on the requiredDistribution)
- AdjustShuffleExchangePosition
- ValidateSparkPlan
- ReplaceHashWithSortAgg
- RemoveRedundantSorts
- DisableUnnecessaryBucketedScan
- OptimizeSkewedJoin (with the EnsureRequirements)
- queryStagePrepRules
queryStagePreparationRules
is used for the initial plan and reOptimize.
Distribution Requirement¶
requiredDistribution: Option[Distribution]
AdaptiveSparkPlanExec
creates requiredDistribution
value when created:
UnspecifiedDistribution
for a subquery (as a subquery output does not need a specific output partitioning)- AQEUtils.getRequiredDistribution for the inputPlan otherwise
requiredDistribution
is used for the following:
- queryStagePreparationRules (to create EnsureRequirements physical optimization)
- optimizeQueryStage
Adaptive Query Stage Physical Optimizations¶
queryStageOptimizerRules: Seq[Rule[SparkPlan]]
AdaptiveSparkPlanExec
creates a collection of physical optimization rules (Rule[SparkPlan]
s) when created (in the order):
- PlanAdaptiveDynamicPruningFilters
- ReuseAdaptiveSubquery
- OptimizeSkewInRebalancePartitions
- CoalesceShufflePartitions
- OptimizeShuffleWithLocalRead
queryStageOptimizerRules
is used to optimizeQueryStage.
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan abstraction.
doExecute
takes the final physical plan to execute it (that generates an RDD[InternalRow]
that will be the return value).
doExecute
triggers finalPlanUpdate (unless done already).
doExecuteColumnar¶
doExecuteColumnar(): RDD[ColumnarBatch]
doExecuteColumnar
is part of the SparkPlan abstraction.
doExecuteColumnar
withFinalPlanUpdate to executeColumnar.
doExecuteBroadcast¶
doExecuteBroadcast[T](): broadcast.Broadcast[T]
doExecuteBroadcast
is part of the SparkPlan abstraction.
doExecuteBroadcast
withFinalPlanUpdate to doExecuteBroadcast.
doExecuteBroadcast
asserts that the final physical plan is a BroadcastQueryStageExec.
Specialized Execution Paths¶
collect¶
executeCollect(): Array[InternalRow]
executeCollect
is part of the SparkPlan abstraction.
executeCollect
...FIXME
tail¶
executeTail(
n: Int): Array[InternalRow]
executeTail
is part of the SparkPlan abstraction.
executeTail
...FIXME
take¶
executeTake(
n: Int): Array[InternalRow]
executeTake
is part of the SparkPlan abstraction.
executeTake
...FIXME
Adaptively-Optimized Physical Query Plan¶
getFinalPhysicalPlan(): SparkPlan
Note
getFinalPhysicalPlan
uses the isFinalPlan internal flag (and an optimized physical query plan) to short-circuit (skip) the whole expensive computation.
getFinalPhysicalPlan
is used when:
AdaptiveSparkPlanExec
physical operator is requested to withFinalPlanUpdate
Step 1. createQueryStages¶
getFinalPhysicalPlan
createQueryStages with the currentPhysicalPlan.
Step 2. Until allChildStagesMaterialized¶
getFinalPhysicalPlan
executes the following until allChildStagesMaterialized
.
Step 2.1 New QueryStageExecs¶
getFinalPhysicalPlan
does the following when there are new stages to be processed:
- FIXME
Step 2.2 StageMaterializationEvents¶
getFinalPhysicalPlan
executes the following until allChildStagesMaterialized
:
- FIXME
Step 2.3 Errors¶
In case of errors, getFinalPhysicalPlan
cleanUpAndThrowException.
Step 2.4 replaceWithQueryStagesInLogicalPlan¶
getFinalPhysicalPlan
replaceWithQueryStagesInLogicalPlan with the currentLogicalPlan
and the stagesToReplace
.
Step 2.5 reOptimize¶
getFinalPhysicalPlan
reOptimize the new logical plan.
Step 2.6 Evaluating Cost¶
getFinalPhysicalPlan
requests the SimpleCostEvaluator to evaluateCost of the currentPhysicalPlan and the new newPhysicalPlan
.
Step 2.7 Adopting New Physical Plan¶
getFinalPhysicalPlan
adopts the new plan if the cost is less than the currentPhysicalPlan or the costs are equal but the physical plans are different (likely better).
getFinalPhysicalPlan
prints out the following message to the logs (using the logOnLevel):
Plan changed from [currentPhysicalPlan] to [newPhysicalPlan]
getFinalPhysicalPlan
cleanUpTempTags with the newPhysicalPlan
.
getFinalPhysicalPlan
saves the newPhysicalPlan
as the currentPhysicalPlan (alongside the currentLogicalPlan
with the newLogicalPlan
).
getFinalPhysicalPlan
resets the stagesToReplace
.
Step 2.8 createQueryStages¶
getFinalPhysicalPlan
createQueryStages for the currentPhysicalPlan (that may have changed).
Step 3. applyPhysicalRules¶
getFinalPhysicalPlan
applyPhysicalRules on the final plan (with the finalStageOptimizerRules, the planChangeLogger and AQE Final Query Stage Optimization name).
getFinalPhysicalPlan
turns the isFinalPlan internal flag on.
finalStageOptimizerRules¶
finalStageOptimizerRules: Seq[Rule[SparkPlan]]
finalStageOptimizerRules
...FIXME
createQueryStages¶
createQueryStages(
plan: SparkPlan): CreateStageResult
createQueryStages
checks if the given SparkPlan is one of the following:
- Exchange unary physical operator
- QueryStageExec leaf physical operator
- Others
The most interesting case is when the given SparkPlan
is an Exchange unary physical operator.
reuseQueryStage¶
reuseQueryStage(
existing: QueryStageExec,
exchange: Exchange): QueryStageExec
reuseQueryStage
requests the given QueryStageExec to newReuseInstance (with the currentStageId).
reuseQueryStage
increments the currentStageId.
reuseQueryStage
setLogicalLinkForNewQueryStage.
Creating QueryStageExec for Exchange¶
newQueryStage(
e: Exchange): QueryStageExec
newQueryStage
creates a new QueryStageExec physical operator based on the type of the given Exchange physical operator.
Exchange | QueryStageExec |
---|---|
ShuffleExchangeLike | ShuffleQueryStageExec |
BroadcastExchangeLike | BroadcastQueryStageExec |
newQueryStage
creates an optimized physical query plan for the child physical plan of the given Exchange. newQueryStage
uses the adaptive optimizations, the PlanChangeLogger and AQE Query Stage Optimization batch name.
newQueryStage
creates a new QueryStageExec physical operator for the given Exchange
operator (using the currentStageId for the ID).
After applyPhysicalRules for the child operator, newQueryStage
creates an optimized physical query plan for the Exchange itself (with the new optimized physical query plan for the child). newQueryStage
uses the post-stage-creation optimizations, the PlanChangeLogger and AQE Post Stage Creation batch name.
newQueryStage
increments the currentStageId counter.
newQueryStage
associates the new query stage operator with the Exchange
physical operator.
In the end, newQueryStage
returns the QueryStageExec
physical operator.
Adaptively-Optimized Physical Query Plan¶
currentPhysicalPlan: SparkPlan
AdaptiveSparkPlanExec
defines currentPhysicalPlan
variable for an adaptively-optimized SparkPlan.
currentPhysicalPlan
is the initialPlan when AdaptiveSparkPlanExec
is created.
currentPhysicalPlan
can only change in getFinalPhysicalPlan and only until the isFinalPlan internal flag is enabled.
While getFinalPhysicalPlan, AdaptiveSparkPlanExec
uses currentPhysicalPlan
as an input argument for createQueryStages that gives a candidate for a new currentPhysicalPlan
. AdaptiveSparkPlanExec
replaces currentPhysicalPlan
when the costEvaluator determines the following:
If a SparkPlan
switch happens, AdaptiveSparkPlanExec
prints out the following message to the logs:
Plan changed from [currentPhysicalPlan] to [newPhysicalPlan]
AdaptiveSparkPlanExec
applyPhysicalRules to optimize the new SparkPlan
.
currentPhysicalPlan
is available using executedPlan.
executedPlan¶
executedPlan: SparkPlan
executedPlan
returns the current physical query plan.
executedPlan
is used when:
ExplainUtils
is requested to generateOperatorIDs, collectOperatorsWithID, generateWholeStageCodegenIds, getSubqueries, removeTagsSparkPlanInfo
is requested tofromSparkPlan
(forSparkListenerSQLExecutionStart
andSparkListenerSQLAdaptiveExecutionUpdate
events)AdaptiveSparkPlanExec
is requested to reset metricsAdaptiveSparkPlanHelper
is requested toallChildren
andstripAQEPlan
PlanAdaptiveDynamicPruningFilters
is executeddebug
package object is requested tocodegenStringSeq
Resetting Metrics¶
resetMetrics(): Unit
resetMetrics
is part of the SparkPlan abstraction.
resetMetrics
requests all the metrics to reset.
In the end, resetMetrics
requests the executed query plan to resetMetrics.
optimizeQueryStage¶
optimizeQueryStage(
plan: SparkPlan,
isFinalStage: Boolean): SparkPlan
isFinalStage
The given isFinalStage
can be as follows:
true
when requested for an adaptively-optimized physical query planfalse
when requested to create a new QueryStageExec for an Exchange
optimizeQueryStage
executes (applies) the queryStageOptimizerRules to the given SparkPlan. While applying optimizations (executing rules), optimizeQueryStage
requests the PlanChangeLogger to log plan changes (by a rule) with the name of the rule that has just been executed.
AQEShuffleReadRule
optimizeQueryStage
is sensitive to AQEShuffleReadRule physical optimization and does a validation so it does not break distribution requirement of the query plan.
optimizeQueryStage
requests the PlanChangeLogger to log plan changes by the entire rule batch with the following batch name:
AQE Query Stage Optimization
optimizeQueryStage
is used when:
AdaptiveSparkPlanExec
is requested for an adaptively-optimized physical query plan and to create a new QueryStageExec for an Exchange
Post-Stage-Creation Adaptive Optimizations¶
postStageCreationRules: Seq[Rule[SparkPlan]]
postStageCreationRules
is the following adaptive optimizations (physical optimization rules):
postStageCreationRules
is used when:
AdaptiveSparkPlanExec
is requested to finalStageOptimizerRules and newQueryStage
Generating Text Representation¶
generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean): Unit
generateTreeString
is part of the TreeNode abstraction.
generateTreeString
...FIXME
cleanUpAndThrowException¶
cleanUpAndThrowException(
errors: Seq[Throwable],
earlyFailedStage: Option[Int]): Unit
cleanUpAndThrowException
...FIXME
cleanUpAndThrowException
is used when AdaptiveSparkPlanExec
physical operator is requested to getFinalPhysicalPlan (and materialization of new stages fails).
Replanning Logical Query Plan¶
reOptimize(
logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan)
reOptimize
returns a newly-optimized physical query plan with a newly-optimized logical query plan for the given logical query plan.
reOptimize
requests the given LogicalPlan to invalidate statistics cache.
reOptimize
requests the Adaptive Logical Optimizer to execute (and generate an optimized logical query plan).
reOptimize
requests the Spark Query Planner (bound to the AdaptiveExecutionContext) to plan the optimized logical query plan (and generate a physical query plan).
reOptimize
executes physical optimizations using preprocessing and preparation rules (and generates an optimized physical query plan).
reOptimize
is used when:
AdaptiveSparkPlanExec
physical operator is requested for an adaptively-optimized physical query plan
QueryStageCreator Thread Pool¶
executionContext: ExecutionContext
executionContext
is an ExecutionContext
that is used when:
-
AdaptiveSparkPlanExec
operator is requested for a getFinalPhysicalPlan (to materialize QueryStageExec operators asynchronously) -
BroadcastQueryStageExec operator is requested for materializeWithTimeout
finalPlanUpdate Lazy Value¶
finalPlanUpdate: Unit
lazy value
finalPlanUpdate
is a Scala lazy value which is computed once when accessed and cached afterwards.
finalPlanUpdate
...FIXME
In the end, finalPlanUpdate
prints out the following message to the logs:
Final plan: [currentPhysicalPlan]
finalPlanUpdate
is used when AdaptiveSparkPlanExec
physical operator is requested to executeCollect, executeTake, executeTail and doExecute.
isFinalPlan (Available Already)¶
isFinalPlan: Boolean
isFinalPlan
is an internal flag that is used to skip (short-circuit) the expensive process of producing an adaptively-optimized physical query plan (and immediately return the one that has already been prepared).
isFinalPlan
is disabled (false
) when AdaptiveSparkPlanExec
is created. It is enabled right after an adaptively-optimized physical query plan has once been prepared.
isFinalPlan
is also used for reporting when AdaptiveSparkPlanExec
is requested for the following:
String Arguments¶
stringArgs: Iterator[Any]
stringArgs
is part of the TreeNode abstraction.
stringArgs
is the following (with the isFinalPlan flag):
isFinalPlan=[isFinalPlan]
Initial Plan¶
initialPlan: SparkPlan
AdaptiveSparkPlanExec
initializes initialPlan
value when created.
initialPlan
is a SparkPlan after applying the queryStagePreparationRules to the inputPlan (with the planChangeLogger and AQE Preparations batch name).
initialPlan
is the currentPhysicalPlan when AdaptiveSparkPlanExec
is created.
isFinalPlan
is used when:
AdaptiveSparkPlanExec
is requested for a text representationExplainUtils
utility is used to process a query plan
replaceWithQueryStagesInLogicalPlan¶
replaceWithQueryStagesInLogicalPlan(
plan: LogicalPlan,
stagesToReplace: Seq[QueryStageExec]): LogicalPlan
replaceWithQueryStagesInLogicalPlan
...FIXME
replaceWithQueryStagesInLogicalPlan
is used when AdaptiveSparkPlanExec
physical operator is requested for a final physical plan.
Executing AQE Query Post Planner Strategy Rules¶
applyQueryPostPlannerStrategyRules(
plan: SparkPlan): SparkPlan
applyQueryPostPlannerStrategyRules
runs the queryPostPlannerStrategyRules on the given SparkPlan.
applyQueryPostPlannerStrategyRules
uses the planChangeLogger and the batch name as follows:
AQE Query Post Planner Strategy Rules
applyQueryPostPlannerStrategyRules
is used when:
AdaptiveSparkPlanExec
physical operator is requested for the initialPlan and to reOptimize
Executing Physical Optimizations¶
applyPhysicalRules(
plan: SparkPlan,
rules: Seq[Rule[SparkPlan]],
loggerAndBatchName: Option[(PlanChangeLogger[SparkPlan], String)] = None): SparkPlan
By default (with no loggerAndBatchName
given) applyPhysicalRules
applies (executes) the given rules to the given physical query plan.
With loggerAndBatchName
specified, applyPhysicalRules
executes the rules and, for every rule, requests the PlanChangeLogger to logRule. In the end, applyPhysicalRules
requests the PlanChangeLogger
to logBatch.
applyPhysicalRules
is used when:
AdaptiveSparkPlanExec
physical operator is created (and initializes the initialPlan), is requested to getFinalPhysicalPlan, newQueryStage, reOptimize- InsertAdaptiveSparkPlan physical optimization is executed
withFinalPlanUpdate¶
withFinalPlanUpdate[T](
fun: SparkPlan => T): T
withFinalPlanUpdate
executes the given fun
with the adaptively-optimized physical query plan and returns the result (of type T
).
In the meantime, withFinalPlanUpdate
finalPlanUpdate.
withFinalPlanUpdate
is a helper method for AdaptiveSparkPlanExec
when requested for the following:
- doExecute
- doExecuteColumnar
- doExecuteBroadcast
- executeCollect
- executeTake
- executeTail
- finalPhysicalPlan
finalPhysicalPlan¶
finalPhysicalPlan: SparkPlan
finalPhysicalPlan
is withFinalPlanUpdate with identity
function (which boils down to getFinalPhysicalPlan alone).
finalPhysicalPlan
is used when:
FileFormatWriter
is requested to write out
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec=ALL
Refer to Logging.
PlanChangeLogger¶
AdaptiveSparkPlanExec
uses a PlanChangeLogger for the following:
Operation | Batch Name |
---|---|
Executing AQE Query Post Planner Strategy Rules | AQE Query Post Planner Strategy Rules |
Initial Plan | AQE Preparations |
Adaptively-Optimized Physical Query Plan | AQE Post Stage Creation |
Creating QueryStageExec for Exchange | AQE Post Stage Creation |
Optimizing QueryStage | AQE Query Stage Optimization |
Replanning Logical Query Plan | AQE Replanning |
logOnLevel¶
logOnLevel: (=> String) => Unit
logOnLevel
uses the internal spark.sql.adaptive.logLevel configuration property for the logging level and prints out the given message to the logs (at the log level).
logOnLevel
is used when:
AdaptiveSparkPlanExec
physical operator is requested to getFinalPhysicalPlan and finalPlanUpdate