OptimizeSkewedJoin Adaptive Physical Optimization¶
OptimizeSkewedJoin
is a physical optimization (Rule[SparkPlan]
) to make data distribution more even in Adaptive Query Execution.
OptimizeSkewedJoin
is also called skew join optimization.
Skew Threshold¶
getSkewThreshold(
medianSize: Long): Long
getSkewThreshold
is the maximum of the following:
- spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
- spark.sql.adaptive.skewJoin.skewedPartitionFactor multiplied by the given
medianSize
getSkewThreshold
is used when:
OptimizeSkewedJoin
is requested to tryOptimizeJoinChildren
Supported Join Types¶
OptimizeSkewedJoin
supports the following join types:
Inner
Cross
LeftSemi
LeftAnti
LeftOuter
RightOuter
Configuration Properties¶
OptimizeSkewedJoin
uses the following configuration properties:
- spark.sql.adaptive.skewJoin.enabled
- spark.sql.adaptive.skewJoin.skewedPartitionFactor
- spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
- spark.sql.adaptive.advisoryPartitionSizeInBytes
Creating Instance¶
OptimizeSkewedJoin
takes the following to be created:
OptimizeSkewedJoin
is created when:
AdaptiveSparkPlanExec
physical operator is requested for the adaptive optimizations
Executing Rule¶
apply(
plan: SparkPlan): SparkPlan
apply
uses spark.sql.adaptive.skewJoin.enabled configuration property to determine whether to apply any optimizations or not.
apply
collects ShuffleQueryStageExec physical operators.
Note
apply
does nothing and simply gives the query plan "untouched" when applied to a query plan with the number of ShuffleQueryStageExec physical operators different than 2
.
apply
...FIXME
apply
is part of the Rule abstraction.
Optimizing Skewed Joins¶
optimizeSkewJoin(
plan: SparkPlan): SparkPlan
optimizeSkewJoin
transforms the following physical operators:
- SortMergeJoinExec (of left and right SortExecs over
ShuffleStage
with ShuffleQueryStageExec and isSkewJoin disabled) - ShuffledHashJoinExec (with left and right
ShuffleStage
s with ShuffleQueryStageExec and isSkewJoin disabled)
optimizeSkewJoin
tryOptimizeJoinChildren and, if a new join left and right child operators are determined, replaces them in the physical operators (with the isSkewJoin
flag enabled).
tryOptimizeJoinChildren¶
tryOptimizeJoinChildren(
left: ShuffleQueryStageExec,
right: ShuffleQueryStageExec,
joinType: JoinType): Option[(SparkPlan, SparkPlan)]
tryOptimizeJoinChildren
...FIXME
Target Partition Size¶
targetSize(
sizes: Seq[Long],
medianSize: Long): Long
targetSize
determines the target partition size (to optimize skewed join) and is the greatest value among the following:
- spark.sql.adaptive.advisoryPartitionSizeInBytes configuration property
- Average size of non-skewed partitions (based on the given
medianSize
)
targetSize
throws an AssertionError
when all partitions are skewed (no non-skewed partitions).
Median Partition Size¶
medianSize(
sizes: Seq[Long]): Long
medianSize
...FIXME
medianSize
is used when:
OptimizeSkewedJoin
is requested to tryOptimizeJoinChildren
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin=ALL
Refer to Logging.