OptimizeShuffleWithLocalRead Adaptive Physical Optimization¶
OptimizeShuffleWithLocalRead is a physical optimization in Adaptive Query Execution.
OptimizeShuffleWithLocalRead can be turned on and off using spark.sql.adaptive.localShuffleReader.enabled configuration property.
isSupported¶
isSupported(
  shuffle: ShuffleExchangeLike): Boolean
isSupported is true when the following all hold:
- The outputPartitioning of the given ShuffleExchangeLike is not SinglePartition
- The shuffleOrigin of the given ShuffleExchangeLike is supported
isSupported is part of the AQEShuffleReadRule abstraction.
Executing Rule¶
apply(
  plan: SparkPlan): SparkPlan
apply is a noop (and simply returns the given SparkPlan) with spark.sql.adaptive.localShuffleReader.enabled disabled.
With canUseLocalShuffleRead apply createLocalRead. Otherwise, apply createProbeSideLocalRead.
apply is part of the Rule abstraction.
canUseLocalShuffleRead¶
canUseLocalShuffleRead(
  plan: SparkPlan): Boolean
canUseLocalShuffleRead is true when one of the following holds:
-  The given SparkPlan is a ShuffleQueryStageExec with the MapOutputStatistics available and the ShuffleExchangeLike is supported 
-  The given SparkPlan is a AQEShuffleReadExec with a ShuffleQueryStageExec with the above requirements met (the MapOutputStatistics is available and the ShuffleExchangeLike is supported) and the shuffleOrigin of the ShuffleExchangeLikeis ENSURE_REQUIREMENTS
canUseLocalShuffleRead is false otherwise.
createLocalRead¶
createLocalRead(
  plan: SparkPlan): AQEShuffleReadExec
createLocalRead branches off based on the type of the given physical operator and creates a new AQEShuffleReadExec (with or without advisory parallelism specified to determine ShufflePartitionSpecs):
-  For AQEShuffleReadExecs with a ShuffleQueryStageExec leaf physical operator, the advisory parallelism is the size of the ShufflePartitionSpec 
-  For ShuffleQueryStageExecs, the advisory parallelism is undefined 
createProbeSideLocalRead¶
createProbeSideLocalRead(
  plan: SparkPlan): SparkPlan
createProbeSideLocalRead...FIXME
getPartitionSpecs¶
getPartitionSpecs(
  shuffleStage: ShuffleQueryStageExec,
  advisoryParallelism: Option[Int]): Seq[ShufflePartitionSpec]
createProbeSideLocalRead...FIXME
Supported ShuffleOrigins¶
AQEShuffleReadRule
supportedShuffleOrigins: Seq[ShuffleOrigin]
supportedShuffleOrigins is part of the AQEShuffleReadRule abstraction.
supportedShuffleOrigins is a collection of the following ShuffleOrigins: