OptimizeShuffleWithLocalRead Adaptive Physical Optimization¶
OptimizeShuffleWithLocalRead
is a physical optimization in Adaptive Query Execution.
OptimizeShuffleWithLocalRead
can be turned on and off using spark.sql.adaptive.localShuffleReader.enabled configuration property.
isSupported¶
isSupported(
shuffle: ShuffleExchangeLike): Boolean
isSupported
is true
when the following all hold:
- The outputPartitioning of the given ShuffleExchangeLike is not
SinglePartition
- The shuffleOrigin of the given ShuffleExchangeLike is supported
isSupported
is part of the AQEShuffleReadRule abstraction.
Executing Rule¶
apply(
plan: SparkPlan): SparkPlan
apply
is a noop (and simply returns the given SparkPlan) with spark.sql.adaptive.localShuffleReader.enabled disabled.
With canUseLocalShuffleRead apply
createLocalRead. Otherwise, apply
createProbeSideLocalRead.
apply
is part of the Rule abstraction.
canUseLocalShuffleRead¶
canUseLocalShuffleRead(
plan: SparkPlan): Boolean
canUseLocalShuffleRead
is true
when one of the following holds:
-
The given SparkPlan is a ShuffleQueryStageExec with the MapOutputStatistics available and the ShuffleExchangeLike is supported
-
The given SparkPlan is a AQEShuffleReadExec with a ShuffleQueryStageExec with the above requirements met (the MapOutputStatistics is available and the ShuffleExchangeLike is supported) and the shuffleOrigin of the
ShuffleExchangeLike
is ENSURE_REQUIREMENTS
canUseLocalShuffleRead
is false
otherwise.
createLocalRead¶
createLocalRead(
plan: SparkPlan): AQEShuffleReadExec
createLocalRead
branches off based on the type of the given physical operator and creates a new AQEShuffleReadExec (with or without advisory parallelism specified to determine ShufflePartitionSpecs):
-
For AQEShuffleReadExecs with a ShuffleQueryStageExec leaf physical operator, the advisory parallelism is the size of the ShufflePartitionSpec
-
For ShuffleQueryStageExecs, the advisory parallelism is undefined
createProbeSideLocalRead¶
createProbeSideLocalRead(
plan: SparkPlan): SparkPlan
createProbeSideLocalRead
...FIXME
getPartitionSpecs¶
getPartitionSpecs(
shuffleStage: ShuffleQueryStageExec,
advisoryParallelism: Option[Int]): Seq[ShufflePartitionSpec]
createProbeSideLocalRead
...FIXME
Supported ShuffleOrigins¶
AQEShuffleReadRule
supportedShuffleOrigins: Seq[ShuffleOrigin]
supportedShuffleOrigins
is part of the AQEShuffleReadRule abstraction.
supportedShuffleOrigins
is a collection of the following ShuffleOrigins: