Skip to content

InsertAdaptiveSparkPlan Physical Optimization

InsertAdaptiveSparkPlan is a physical query plan optimization in Adaptive Query Execution that inserts AdaptiveSparkPlanExec operators.

InsertAdaptiveSparkPlan is a Rule to transform a SparkPlan (Rule[SparkPlan]).

Creating Instance

InsertAdaptiveSparkPlan takes the following to be created:

InsertAdaptiveSparkPlan is created when:

AdaptiveExecutionContext

InsertAdaptiveSparkPlan is given an AdaptiveExecutionContext when created.

The AdaptiveExecutionContext is used to create an AdaptiveSparkPlanExec physical operator (for a plan) when executed.

Adaptive Requirements

shouldApplyAQE(
  plan: SparkPlan,
  isSubquery: Boolean): Boolean

shouldApplyAQE returns true when one of the following conditions holds:

  1. spark.sql.adaptive.forceApply configuration property is enabled
  2. The given isSubquery flag is true (a shortcut to continue since the input plan is from a sub-query and it was already decided to apply AQE for the main query)
  3. The given SparkPlan contains one of the following physical operators:
    1. Exchange
    2. Operators with an UnspecifiedDistribution among the requiredChildDistribution (and the query may need to add exchanges)
    3. Operators with SubqueryExpression

Executing Rule

apply(
  plan: SparkPlan): SparkPlan

apply is part of the Rule abstraction.

apply applyInternal with the given SparkPlan and isSubquery flag disabled (false).

applyInternal

applyInternal(
  plan: SparkPlan,
  isSubquery: Boolean): SparkPlan

applyInternal returns the given SparkPlan unmodified when one of the following holds:

  1. spark.sql.adaptive.enabled configuration property is disabled (false)
  2. The given SparkPlan is either ExecutedCommandExec or CommandResultExec

applyInternal skips processing the following parent physical operators and handles the children:

For all the other SparkPlans, applyInternal checks out shouldApplyAQE condition. If holds, applyInternal checks out whether the physical plan supports Adaptive Query Execution or not.

Physical Plans Supporting Adaptive Query Execution

applyInternal creates a new PlanAdaptiveSubqueries optimization (with subquery expressions) and executes it on the given SparkPlan.

applyInternal prints out the following DEBUG message to the logs:

Adaptive execution enabled for plan: [plan]

In the end, applyInternal creates an AdaptiveSparkPlanExec physical operator with the new pre-processed SparkPlan.

In case of SubqueryAdaptiveNotSupportedException, applyInternal prints out the WARN message and returns the given physical plan.

spark.sql.adaptive.enabled is enabled but is not supported for sub-query: [subquery].

Unsupported Physical Plans

applyInternal simply prints out the WARN message and returns the given physical plan.

spark.sql.adaptive.enabled is enabled but is not supported for query: [plan].

Usage

applyInternal is used by InsertAdaptiveSparkPlan when requested for the following:

Collecting Subquery Expressions

buildSubqueryMap(
  plan: SparkPlan): Map[Long, SubqueryExec]

buildSubqueryMap finds ScalarSubquery and ListQuery (in InSubquery) expressions (unique by expression ID to reuse the execution plan from another sub-query) in the given physical query plan.

For every ScalarSubquery and ListQuery expressions, buildSubqueryMap compileSubquery, verifyAdaptivePlan and registers a new SubqueryExec operator.

compileSubquery

compileSubquery(
  plan: LogicalPlan): SparkPlan

compileSubquery requests the session-bound SparkPlanner (from the AdaptiveExecutionContext) to plan the given LogicalPlan (that produces a SparkPlan).

In the end, compileSubquery applyInternal with isSubquery flag turned on.

Enforcing AdaptiveSparkPlanExec

verifyAdaptivePlan(
  plan: SparkPlan,
  logicalPlan: LogicalPlan): Unit

verifyAdaptivePlan throws a SubqueryAdaptiveNotSupportedException when the given SparkPlan is not a AdaptiveSparkPlanExec.

supportAdaptive Condition

supportAdaptive(
  plan: SparkPlan): Boolean

supportAdaptive returns true when the given SparkPlan and the children have all logical operator linked that are not streaming.

Logging

Enable ALL logging level for org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.InsertAdaptiveSparkPlan.name = org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
logger.InsertAdaptiveSparkPlan.level = all

Refer to Logging.