Skip to content

OptimizeSkewedJoin Adaptive Physical Optimization

OptimizeSkewedJoin is a physical optimization to make data distribution more even in Adaptive Query Execution.

OptimizeSkewedJoin is also called skew join optimization.

Supported Join Types

OptimizeSkewedJoin supports the following join types:

  • Inner
  • Cross
  • LeftSemi
  • LeftAnti
  • LeftOuter
  • RightOuter

Configuration Properties

OptimizeSkewedJoin uses the following configuration properties:

Creating Instance

OptimizeSkewedJoin takes the following to be created:

OptimizeSkewedJoin is created when:

Executing Rule

  plan: SparkPlan): SparkPlan

apply uses spark.sql.adaptive.skewJoin.enabled configuration property to determine whether to apply any optimizations or not.

apply collects ShuffleQueryStageExec physical operators.


apply does nothing and simply gives the query plan "untouched" when applied to a query plan with the number of ShuffleQueryStageExec physical operators different than 2.


apply is part of the Rule abstraction.

Optimizing Skewed Joins

  plan: SparkPlan): SparkPlan

optimizeSkewJoin transforms SortMergeJoinExec physical operators (with the supportedJoinTypes) of two SortExec operators with ShuffleQueryStageExec children.

optimizeSkewJoin handles SortMergeJoinExec operators with the left and right operators of the same number of partitions.

optimizeSkewJoin computes median partition size for the left and right operators.

optimizeSkewJoin prints out the following DEBUG message to the logs:

Optimizing skewed join.
Left side partitions size info:
Right side partitions size info:


optimizeSkewJoin prints out the following DEBUG message to the logs:

number of skewed partitions: left [numPartitions], right [numPartitions]

In the end, optimizeSkewJoin creates CustomShuffleReaderExec physical operators for the left and right children of the SortMergeJoinExec operator if and only if the number of skewed partitions for either side is greater than 0. optimizeSkewJoin turns on the isSkewJoin flag (of the SortMergeJoinExec operator). Otherwise, optimizeSkewJoin leaves the SortMergeJoinExec operator "untouched".


  left: ShuffleQueryStageExec,
  right: ShuffleQueryStageExec,
  joinType: JoinType): Option[(SparkPlan, SparkPlan)]


isSkewed Predicate

  size: Long,
  medianSize: Long): Boolean

isSkewed is on (true) when the given size is greater than all of the following:

Target Partition Size

  sizes: Seq[Long],
  medianSize: Long): Long

targetSize determines the target partition size (to optimize skewed join) and is the greatest value among the following:

targetSize throws an AssertionError when all partitions are skewed (no non-skewed partitions).

Median Partition Size

  sizes: Seq[Long]): Long



Enable ALL logging level for org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin logger to see what happens inside.

Add the following line to conf/

Refer to Logging.

Back to top