OptimizeShuffleWithLocalRead Adaptive Physical Optimization¶
OptimizeShuffleWithLocalRead is a physical optimization in Adaptive Query Execution.
OptimizeShuffleWithLocalRead can be turned on and off using spark.sql.adaptive.localShuffleReader.enabled configuration property.
isSupported¶
isSupported(
shuffle: ShuffleExchangeLike): Boolean
isSupported is true when the following all hold:
- The outputPartitioning of the given ShuffleExchangeLike is not
SinglePartition - The shuffleOrigin of the given ShuffleExchangeLike is supported
isSupported is part of the AQEShuffleReadRule abstraction.
Executing Rule¶
apply(
plan: SparkPlan): SparkPlan
apply is a noop (and simply returns the given SparkPlan) with spark.sql.adaptive.localShuffleReader.enabled disabled.
With canUseLocalShuffleRead apply createLocalRead. Otherwise, apply createProbeSideLocalRead.
apply is part of the Rule abstraction.
canUseLocalShuffleRead¶
canUseLocalShuffleRead(
plan: SparkPlan): Boolean
canUseLocalShuffleRead is true when one of the following holds:
-
The given SparkPlan is a ShuffleQueryStageExec with the MapOutputStatistics available and the ShuffleExchangeLike is supported
-
The given SparkPlan is a AQEShuffleReadExec with a ShuffleQueryStageExec with the above requirements met (the MapOutputStatistics is available and the ShuffleExchangeLike is supported) and the shuffleOrigin of the
ShuffleExchangeLikeis ENSURE_REQUIREMENTS
canUseLocalShuffleRead is false otherwise.
createLocalRead¶
createLocalRead(
plan: SparkPlan): AQEShuffleReadExec
createLocalRead branches off based on the type of the given physical operator and creates a new AQEShuffleReadExec (with or without advisory parallelism specified to determine ShufflePartitionSpecs):
-
For AQEShuffleReadExecs with a ShuffleQueryStageExec leaf physical operator, the advisory parallelism is the size of the ShufflePartitionSpec
-
For ShuffleQueryStageExecs, the advisory parallelism is undefined
createProbeSideLocalRead¶
createProbeSideLocalRead(
plan: SparkPlan): SparkPlan
createProbeSideLocalRead...FIXME
getPartitionSpecs¶
getPartitionSpecs(
shuffleStage: ShuffleQueryStageExec,
advisoryParallelism: Option[Int]): Seq[ShufflePartitionSpec]
createProbeSideLocalRead...FIXME
Supported ShuffleOrigins¶
AQEShuffleReadRule
supportedShuffleOrigins: Seq[ShuffleOrigin]
supportedShuffleOrigins is part of the AQEShuffleReadRule abstraction.
supportedShuffleOrigins is a collection of the following ShuffleOrigins: