InsertAdaptiveSparkPlan Physical Optimization¶
InsertAdaptiveSparkPlan is a physical query plan optimization in Adaptive Query Execution that inserts AdaptiveSparkPlanExec operators.
InsertAdaptiveSparkPlan is a Rule to transform a SparkPlan (Rule[SparkPlan]).
Creating Instance¶
InsertAdaptiveSparkPlan takes the following to be created:
InsertAdaptiveSparkPlan is created when:
- QueryExecutionis requested for physical preparations rules
AdaptiveExecutionContext¶
InsertAdaptiveSparkPlan is given an AdaptiveExecutionContext when created.
The AdaptiveExecutionContext is used to create an AdaptiveSparkPlanExec physical operator (for a plan) when executed.
Adaptive Requirements¶
shouldApplyAQE(
  plan: SparkPlan,
  isSubquery: Boolean): Boolean
shouldApplyAQE returns true when one of the following conditions holds:
- spark.sql.adaptive.forceApply configuration property is enabled
- The given isSubqueryflag istrue(a shortcut to continue since the input plan is from a sub-query and it was already decided to apply AQE for the main query)
- The given SparkPlan contains one of the following physical operators:- Exchange
- Operators with an UnspecifiedDistribution among the requiredChildDistribution (and the query may need to add exchanges)
- Operators with SubqueryExpression
 
Executing Rule¶
apply(
  plan: SparkPlan): SparkPlan
apply is part of the Rule abstraction.
apply applyInternal with the given SparkPlan and isSubquery flag disabled (false).
applyInternal¶
applyInternal(
  plan: SparkPlan,
  isSubquery: Boolean): SparkPlan
applyInternal returns the given SparkPlan unmodified when one of the following holds:
- spark.sql.adaptive.enabled configuration property is disabled (false)
- The given SparkPlanis either ExecutedCommandExec orCommandResultExec
applyInternal skips processing the following parent physical operators and handles the children:
For all the other SparkPlans, applyInternal checks out shouldApplyAQE condition. If holds, applyInternal checks out whether the physical plan supports Adaptive Query Execution or not.
Physical Plans Supporting Adaptive Query Execution¶
applyInternal creates a new PlanAdaptiveSubqueries optimization (with subquery expressions) and executes it on the given SparkPlan.
applyInternal prints out the following DEBUG message to the logs:
Adaptive execution enabled for plan: [plan]
In the end, applyInternal creates an AdaptiveSparkPlanExec physical operator with the new pre-processed SparkPlan.
In case of SubqueryAdaptiveNotSupportedException, applyInternal prints out the WARN message and returns the given physical plan.
spark.sql.adaptive.enabled is enabled but is not supported for sub-query: [subquery].
Unsupported Physical Plans¶
applyInternal simply prints out the WARN message and returns the given physical plan.
spark.sql.adaptive.enabled is enabled but is not supported for query: [plan].
Usage¶
applyInternal is used by InsertAdaptiveSparkPlan when requested for the following:
- Execute (with the isSubqueryflag disabled)
- Compile a subquery (with the isSubqueryflag enabled)
Collecting Subquery Expressions¶
buildSubqueryMap(
  plan: SparkPlan): Map[Long, SubqueryExec]
buildSubqueryMap finds ScalarSubquery and ListQuery (in InSubquery) expressions (unique by expression ID to reuse the execution plan from another sub-query) in the given physical query plan.
For every ScalarSubquery and ListQuery expressions, buildSubqueryMap compileSubquery, verifyAdaptivePlan and registers a new SubqueryExec operator.
compileSubquery¶
compileSubquery(
  plan: LogicalPlan): SparkPlan
compileSubquery requests the session-bound SparkPlanner (from the AdaptiveExecutionContext) to plan the given LogicalPlan (that produces a SparkPlan).
In the end, compileSubquery applyInternal with isSubquery flag turned on.
Enforcing AdaptiveSparkPlanExec¶
verifyAdaptivePlan(
  plan: SparkPlan,
  logicalPlan: LogicalPlan): Unit
verifyAdaptivePlan throws a SubqueryAdaptiveNotSupportedException when the given SparkPlan is not a AdaptiveSparkPlanExec.
supportAdaptive Condition¶
supportAdaptive(
  plan: SparkPlan): Boolean
supportAdaptive returns true when the given SparkPlan and the children have all logical operator linked that are not streaming.
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.InsertAdaptiveSparkPlan.name = org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
logger.InsertAdaptiveSparkPlan.level = all
Refer to Logging.