InsertAdaptiveSparkPlan Physical Optimization¶
InsertAdaptiveSparkPlan
is a physical query plan optimization in Adaptive Query Execution that inserts AdaptiveSparkPlanExec operators.
InsertAdaptiveSparkPlan
is a Rule to transform a SparkPlan (Rule[SparkPlan]
).
Creating Instance¶
InsertAdaptiveSparkPlan
takes the following to be created:
InsertAdaptiveSparkPlan
is created when:
QueryExecution
is requested for physical preparations rules
AdaptiveExecutionContext¶
InsertAdaptiveSparkPlan
is given an AdaptiveExecutionContext when created.
The AdaptiveExecutionContext
is used to create an AdaptiveSparkPlanExec physical operator (for a plan) when executed.
Adaptive Requirements¶
shouldApplyAQE(
plan: SparkPlan,
isSubquery: Boolean): Boolean
shouldApplyAQE
returns true
when one of the following conditions holds:
- spark.sql.adaptive.forceApply configuration property is enabled
- The given
isSubquery
flag istrue
(a shortcut to continue since the input plan is from a sub-query and it was already decided to apply AQE for the main query) - The given SparkPlan contains one of the following physical operators:
- Exchange
- Operators with an UnspecifiedDistribution among the requiredChildDistribution (and the query may need to add exchanges)
- Operators with SubqueryExpression
Executing Rule¶
apply(
plan: SparkPlan): SparkPlan
apply
is part of the Rule abstraction.
apply
applyInternal with the given SparkPlan and isSubquery
flag disabled (false
).
applyInternal¶
applyInternal(
plan: SparkPlan,
isSubquery: Boolean): SparkPlan
applyInternal
returns the given SparkPlan unmodified when one of the following holds:
- spark.sql.adaptive.enabled configuration property is disabled (
false
) - The given
SparkPlan
is either ExecutedCommandExec orCommandResultExec
applyInternal
skips processing the following parent physical operators and handles the children:
For all the other SparkPlan
s, applyInternal
checks out shouldApplyAQE condition. If holds, applyInternal
checks out whether the physical plan supports Adaptive Query Execution or not.
Physical Plans Supporting Adaptive Query Execution¶
applyInternal
creates a new PlanAdaptiveSubqueries optimization (with subquery expressions) and executes it on the given SparkPlan
.
applyInternal
prints out the following DEBUG message to the logs:
Adaptive execution enabled for plan: [plan]
In the end, applyInternal
creates an AdaptiveSparkPlanExec physical operator with the new pre-processed SparkPlan
.
In case of SubqueryAdaptiveNotSupportedException
, applyInternal
prints out the WARN message and returns the given physical plan.
spark.sql.adaptive.enabled is enabled but is not supported for sub-query: [subquery].
Unsupported Physical Plans¶
applyInternal
simply prints out the WARN message and returns the given physical plan.
spark.sql.adaptive.enabled is enabled but is not supported for query: [plan].
Usage¶
applyInternal
is used by InsertAdaptiveSparkPlan
when requested for the following:
- Execute (with the
isSubquery
flag disabled) - Compile a subquery (with the
isSubquery
flag enabled)
Collecting Subquery Expressions¶
buildSubqueryMap(
plan: SparkPlan): Map[Long, SubqueryExec]
buildSubqueryMap
finds ScalarSubquery and ListQuery (in InSubquery) expressions (unique by expression ID to reuse the execution plan from another sub-query) in the given physical query plan.
For every ScalarSubquery
and ListQuery
expressions, buildSubqueryMap
compileSubquery, verifyAdaptivePlan and registers a new SubqueryExec operator.
compileSubquery¶
compileSubquery(
plan: LogicalPlan): SparkPlan
compileSubquery
requests the session-bound SparkPlanner (from the AdaptiveExecutionContext) to plan the given LogicalPlan (that produces a SparkPlan).
In the end, compileSubquery
applyInternal with isSubquery
flag turned on.
Enforcing AdaptiveSparkPlanExec¶
verifyAdaptivePlan(
plan: SparkPlan,
logicalPlan: LogicalPlan): Unit
verifyAdaptivePlan
throws a SubqueryAdaptiveNotSupportedException
when the given SparkPlan is not a AdaptiveSparkPlanExec.
supportAdaptive Condition¶
supportAdaptive(
plan: SparkPlan): Boolean
supportAdaptive
returns true
when the given SparkPlan and the children have all logical operator linked that are not streaming.
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.InsertAdaptiveSparkPlan.name = org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
logger.InsertAdaptiveSparkPlan.level = all
Refer to Logging.