DynamicJoinSelection Adaptive Logical Optimization¶
DynamicJoinSelection
is a logical optimization in Adaptive Query Execution to transform Join logical operators with JoinHints.
DynamicJoinSelection
is a Catalyst rule for transforming logical plans (Rule[LogicalPlan]
).
Creating Instance¶
DynamicJoinSelection
takes no arguments to be created.
DynamicJoinSelection
is created when:
AQEOptimizer
is requested for the default batches (of adaptive optimizations)
Executing Rule¶
apply(
plan: LogicalPlan): LogicalPlan
apply
traverses the given LogicalPlan down (the tree) and rewrites Join logical operators as follows:
-
If there is no JoinStrategyHint defined for the left side,
apply
selects the JoinStrategy for the left operator. -
If there is no JoinStrategyHint defined for the right side,
apply
selects the JoinStrategy for the right operator. -
apply
associates the new JoinHint with theJoin
logical operator
apply
is part of the Rule abstraction.
selectJoinStrategy¶
selectJoinStrategy(
plan: LogicalPlan): Option[JoinStrategyHint]
selectJoinStrategy
works only with LogicalQueryStages of ShuffleQueryStageExecs that are materialized and have mapStats defined (and returns None
otherwise).
selectJoinStrategy
selects a JoinStrategyHint based on shouldDemoteBroadcastHashJoin and preferShuffledHashJoin with the mapStats.
demoteBroadcastHash | preferShuffleHash | JoinStrategyHint |
---|---|---|
true | true | SHUFFLE_HASH |
true | false | NO_BROADCAST_HASH |
false | true | PREFER_SHUFFLE_HASH |
false | false | None (undefined) |
preferShuffledHashJoin¶
preferShuffledHashJoin(
mapStats: MapOutputStatistics): Boolean
preferShuffledHashJoin
takes a MapOutputStatistics
(Apache Spark) and holds (true
) when all of the following hold:
- spark.sql.adaptive.advisoryPartitionSizeInBytes is at most spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
- Approximate number of output bytes (
bytesByPartitionId
) of every map output partition of the givenMapOutputStatistics
is at most spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
shouldDemoteBroadcastHashJoin¶
shouldDemoteBroadcastHashJoin(
mapStats: MapOutputStatistics): Boolean
shouldDemoteBroadcastHashJoin
takes a MapOutputStatistics
(Apache Spark) and holds (true
) when all of the following hold:
- There is at least 1 partition with data (based on the
bytesByPartitionId
collection of the givenMapOutputStatistics
) - The ratio of the non-empty partitions to all partitions is below spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin configuration property
Demo¶
// :paste -raw
package org.apache.spark.japila
import org.apache.spark.MapOutputStatistics
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
class MyShuffleQueryStageExec(
override val id: Int,
override val plan: SparkPlan,
override val _canonicalized: SparkPlan) extends ShuffleQueryStageExec(id, plan, _canonicalized) {
override def isMaterialized: Boolean = true
override def mapStats: Option[MapOutputStatistics] = {
val shuffleId = 0
// must be smaller than conf.nonEmptyPartitionRatioForBroadcastJoin
val bytesByPartitionId = Array[Long](1, 0, 0, 0, 0, 0)
Some(new MapOutputStatistics(shuffleId, bytesByPartitionId))
}
}
import org.apache.spark.sql.catalyst.dsl.plans._
val logicalPlan = table("t1")
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.catalyst.plans.physical.RoundRobinPartitioning
import org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS
import org.apache.spark.sql.execution.PlanLater
import org.apache.spark.sql.catalyst.dsl.plans._
val child = PlanLater(table("t2"))
val shuffleExec = ShuffleExchangeExec(RoundRobinPartitioning(10), child, ENSURE_REQUIREMENTS)
import org.apache.spark.japila.MyShuffleQueryStageExec
val stage = new MyShuffleQueryStageExec(id = 0, plan = shuffleExec, _canonicalized = shuffleExec)
assert(stage.isMaterialized,
"DynamicJoinSelection expects materialized ShuffleQueryStageExecs")
assert(stage.mapStats.isDefined,
"DynamicJoinSelection expects ShuffleQueryStageExecs with MapOutputStatistics")
import org.apache.spark.sql.catalyst.plans.logical.JoinHint
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
val left = LogicalQueryStage(logicalPlan, physicalPlan = stage)
val right = LogicalQueryStage(logicalPlan, physicalPlan = stage)
val plan = Join(left, right, joinType = Inner, condition = None, hint = JoinHint.NONE)
import org.apache.spark.sql.execution.adaptive.DynamicJoinSelection
val newPlan = DynamicJoinSelection(plan)
scala> println(newPlan.numberedTreeString)
00 Join Inner, leftHint=(strategy=no_broadcast_hash), rightHint=(strategy=no_broadcast_hash)
01 :- LogicalQueryStage 'UnresolvedRelation [t1], [], false, MyShuffleQueryStage 0
02 +- LogicalQueryStage 'UnresolvedRelation [t1], [], false, MyShuffleQueryStage 0
// cf. DynamicJoinSelection.shouldDemoteBroadcastHashJoin
val mapStats = stage.mapStats.get
val conf = spark.sessionState.conf