Skip to content

DynamicJoinSelection Adaptive Logical Optimization

DynamicJoinSelection is a logical optimization in Adaptive Query Execution to transform Join logical operators with JoinHints.

DynamicJoinSelection is a Catalyst rule for transforming logical plans (Rule[LogicalPlan]).

Creating Instance

DynamicJoinSelection takes no arguments to be created.

DynamicJoinSelection is created when:

  • AQEOptimizer is requested for the default batches (of adaptive optimizations)

Executing Rule

apply(
  plan: LogicalPlan): LogicalPlan

apply traverses the given LogicalPlan down (the tree) and rewrites Join logical operators as follows:

  1. If there is no JoinStrategyHint defined for the left side, apply selects the JoinStrategy for the left operator.

  2. If there is no JoinStrategyHint defined for the right side, apply selects the JoinStrategy for the right operator.

  3. apply associates the new JoinHint with the Join logical operator

apply is part of the Rule abstraction.

selectJoinStrategy

selectJoinStrategy(
  plan: LogicalPlan): Option[JoinStrategyHint]

selectJoinStrategy works only with LogicalQueryStages of ShuffleQueryStageExecs that are materialized and have mapStats defined (and returns None otherwise).

selectJoinStrategy selects a JoinStrategyHint based on shouldDemoteBroadcastHashJoin and preferShuffledHashJoin with the mapStats.

demoteBroadcastHash preferShuffleHash JoinStrategyHint
true true SHUFFLE_HASH
true false NO_BROADCAST_HASH
false true PREFER_SHUFFLE_HASH
false false None (undefined)

preferShuffledHashJoin

preferShuffledHashJoin(
  mapStats: MapOutputStatistics): Boolean

preferShuffledHashJoin takes a MapOutputStatistics (Apache Spark) and holds (true) when all of the following hold:

  1. spark.sql.adaptive.advisoryPartitionSizeInBytes is at most spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
  2. Approximate number of output bytes (bytesByPartitionId) of every map output partition of the given MapOutputStatistics is at most spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold

shouldDemoteBroadcastHashJoin

shouldDemoteBroadcastHashJoin(
  mapStats: MapOutputStatistics): Boolean

shouldDemoteBroadcastHashJoin takes a MapOutputStatistics (Apache Spark) and holds (true) when all of the following hold:

  1. There is at least 1 partition with data (based on the bytesByPartitionId collection of the given MapOutputStatistics)
  2. The ratio of the non-empty partitions to all partitions is below spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin configuration property

Demo

// :paste -raw
package org.apache.spark.japila

import org.apache.spark.MapOutputStatistics

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec

class MyShuffleQueryStageExec(
    override val id: Int,
    override val plan: SparkPlan,
    override val _canonicalized: SparkPlan) extends ShuffleQueryStageExec(id, plan, _canonicalized) {

  override def isMaterialized: Boolean = true

  override def mapStats: Option[MapOutputStatistics] = {
    val shuffleId = 0
    // must be smaller than conf.nonEmptyPartitionRatioForBroadcastJoin
    val bytesByPartitionId = Array[Long](1, 0, 0, 0, 0, 0)
    Some(new MapOutputStatistics(shuffleId, bytesByPartitionId))
  }
}
import org.apache.spark.sql.catalyst.dsl.plans._
val logicalPlan = table("t1")

import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.catalyst.plans.physical.RoundRobinPartitioning
import org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS
import org.apache.spark.sql.execution.PlanLater

import org.apache.spark.sql.catalyst.dsl.plans._
val child = PlanLater(table("t2"))

val shuffleExec = ShuffleExchangeExec(RoundRobinPartitioning(10), child, ENSURE_REQUIREMENTS)

import org.apache.spark.japila.MyShuffleQueryStageExec
val stage = new MyShuffleQueryStageExec(id = 0, plan = shuffleExec, _canonicalized = shuffleExec)

assert(stage.isMaterialized,
  "DynamicJoinSelection expects materialized ShuffleQueryStageExecs")
assert(stage.mapStats.isDefined,
  "DynamicJoinSelection expects ShuffleQueryStageExecs with MapOutputStatistics")

import org.apache.spark.sql.catalyst.plans.logical.JoinHint
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution.adaptive.LogicalQueryStage

val left = LogicalQueryStage(logicalPlan, physicalPlan = stage)
val right = LogicalQueryStage(logicalPlan, physicalPlan = stage)

val plan = Join(left, right, joinType = Inner, condition = None, hint = JoinHint.NONE)

import org.apache.spark.sql.execution.adaptive.DynamicJoinSelection
val newPlan = DynamicJoinSelection(plan)
scala> println(newPlan.numberedTreeString)
00 Join Inner, leftHint=(strategy=no_broadcast_hash), rightHint=(strategy=no_broadcast_hash)
01 :- LogicalQueryStage 'UnresolvedRelation [t1], [], false, MyShuffleQueryStage 0
02 +- LogicalQueryStage 'UnresolvedRelation [t1], [], false, MyShuffleQueryStage 0
// cf. DynamicJoinSelection.shouldDemoteBroadcastHashJoin
val mapStats = stage.mapStats.get
val conf = spark.sessionState.conf