Skip to content

OptimizeSkewedJoin Adaptive Physical Optimization

OptimizeSkewedJoin is a physical optimization (Rule[SparkPlan]) to make data distribution more even in Adaptive Query Execution.

OptimizeSkewedJoin is also called skew join optimization.

Skew Threshold

getSkewThreshold(
  medianSize: Long): Long

getSkewThreshold is the maximum of the following:

  1. spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
  2. spark.sql.adaptive.skewJoin.skewedPartitionFactor multiplied by the given medianSize

getSkewThreshold is used when:

Supported Join Types

OptimizeSkewedJoin supports the following join types:

  • Inner
  • Cross
  • LeftSemi
  • LeftAnti
  • LeftOuter
  • RightOuter

Configuration Properties

OptimizeSkewedJoin uses the following configuration properties:

Creating Instance

OptimizeSkewedJoin takes the following to be created:

OptimizeSkewedJoin is created when:

Executing Rule

apply(
  plan: SparkPlan): SparkPlan

apply uses spark.sql.adaptive.skewJoin.enabled configuration property to determine whether to apply any optimizations or not.

apply collects ShuffleQueryStageExec physical operators.

Note

apply does nothing and simply gives the query plan "untouched" when applied to a query plan with the number of ShuffleQueryStageExec physical operators different than 2.

apply...FIXME

apply is part of the Rule abstraction.

Optimizing Skewed Joins

optimizeSkewJoin(
  plan: SparkPlan): SparkPlan

optimizeSkewJoin transforms the following physical operators:

optimizeSkewJoin tryOptimizeJoinChildren and, if a new join left and right child operators are determined, replaces them in the physical operators (with the isSkewJoin flag enabled).

tryOptimizeJoinChildren

tryOptimizeJoinChildren(
  left: ShuffleQueryStageExec,
  right: ShuffleQueryStageExec,
  joinType: JoinType): Option[(SparkPlan, SparkPlan)]

tryOptimizeJoinChildren...FIXME

Target Partition Size

targetSize(
  sizes: Seq[Long],
  medianSize: Long): Long

targetSize determines the target partition size (to optimize skewed join) and is the greatest value among the following:

targetSize throws an AssertionError when all partitions are skewed (no non-skewed partitions).

Median Partition Size

medianSize(
  sizes: Seq[Long]): Long

medianSize...FIXME

medianSize is used when:

Logging

Enable ALL logging level for org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin logger to see what happens inside.

Add the following line to conf/log4j2.properties:

log4j.logger.org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin=ALL

Refer to Logging.