OptimizeSkewedJoin Adaptive Physical Optimization¶
OptimizeSkewedJoin is a physical optimization (Rule[SparkPlan]) to make data distribution more even in Adaptive Query Execution.
OptimizeSkewedJoin is also called skew join optimization.
Skew Threshold¶
getSkewThreshold(
medianSize: Long): Long
getSkewThreshold is the maximum of the following:
- spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
- spark.sql.adaptive.skewJoin.skewedPartitionFactor multiplied by the given
medianSize
getSkewThreshold is used when:
OptimizeSkewedJoinis requested to tryOptimizeJoinChildren
Supported Join Types¶
OptimizeSkewedJoin supports the following join types:
InnerCrossLeftSemiLeftAntiLeftOuterRightOuter
Configuration Properties¶
OptimizeSkewedJoin uses the following configuration properties:
- spark.sql.adaptive.skewJoin.enabled
- spark.sql.adaptive.skewJoin.skewedPartitionFactor
- spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
- spark.sql.adaptive.advisoryPartitionSizeInBytes
Creating Instance¶
OptimizeSkewedJoin takes the following to be created:
OptimizeSkewedJoin is created when:
AdaptiveSparkPlanExecphysical operator is requested for the adaptive optimizations
Executing Rule¶
apply(
plan: SparkPlan): SparkPlan
apply uses spark.sql.adaptive.skewJoin.enabled configuration property to determine whether to apply any optimizations or not.
apply collects ShuffleQueryStageExec physical operators.
Note
apply does nothing and simply gives the query plan "untouched" when applied to a query plan with the number of ShuffleQueryStageExec physical operators different than 2.
apply...FIXME
apply is part of the Rule abstraction.
Optimizing Skewed Joins¶
optimizeSkewJoin(
plan: SparkPlan): SparkPlan
optimizeSkewJoin transforms the following physical operators:
- SortMergeJoinExec (of left and right SortExecs over
ShuffleStagewith ShuffleQueryStageExec and isSkewJoin disabled) - ShuffledHashJoinExec (with left and right
ShuffleStages with ShuffleQueryStageExec and isSkewJoin disabled)
optimizeSkewJoin tryOptimizeJoinChildren and, if a new join left and right child operators are determined, replaces them in the physical operators (with the isSkewJoin flag enabled).
tryOptimizeJoinChildren¶
tryOptimizeJoinChildren(
left: ShuffleQueryStageExec,
right: ShuffleQueryStageExec,
joinType: JoinType): Option[(SparkPlan, SparkPlan)]
tryOptimizeJoinChildren...FIXME
Target Partition Size¶
targetSize(
sizes: Seq[Long],
medianSize: Long): Long
targetSize determines the target partition size (to optimize skewed join) and is the greatest value among the following:
- spark.sql.adaptive.advisoryPartitionSizeInBytes configuration property
- Average size of non-skewed partitions (based on the given
medianSize)
targetSize throws an AssertionError when all partitions are skewed (no non-skewed partitions).
Median Partition Size¶
medianSize(
sizes: Seq[Long]): Long
medianSize...FIXME
medianSize is used when:
OptimizeSkewedJoinis requested to tryOptimizeJoinChildren
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin logger to see what happens inside.
Add the following line to conf/log4j2.properties:
log4j.logger.org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin=ALL
Refer to Logging.