ShuffleQueryStageExec Adaptive Leaf Physical Operator¶
ShuffleQueryStageExec is a QueryStageExec with either ShuffleExchangeExec or ReusedExchangeExec child operators.
Creating Instance¶
ShuffleQueryStageExec takes the following to be created:
ShuffleQueryStageExec is created when:
- AdaptiveSparkPlanExec physical operator is requested to newQueryStage (for a ShuffleExchangeExec)
ShuffleQueryStageExecphysical operator is requested to newReuseInstance
ShuffleExchangeLike¶
shuffle: ShuffleExchangeLike
ShuffleQueryStageExec initializes the shuffle internal registry when created.
ShuffleQueryStageExec assumes that the given physical operator is either a ShuffleExchangeLike or a ReusedExchangeExec and extracts the ShuffleExchangeLike.
If not, ShuffleQueryStageExec throws an IllegalStateException:
wrong plan for shuffle stage:
[tree]
shuffle is used when:
AQEShuffleReadExecunary physical operator is requested for the shuffleRDD- CoalesceShufflePartitions physical optimization is executed
- OptimizeShuffleWithLocalRead physical optimization is executed
- OptimizeSkewedJoin physical optimization is executed
- OptimizeSkewInRebalancePartitions physical optimization is executed
ShuffleQueryStageExecleaf physical operator is requested for the shuffle MapOutputStatistics, newReuseInstance and getRuntimeStatistics
Shuffle MapOutputStatistics Future¶
shuffleFuture: Future[MapOutputStatistics]
shuffleFuture requests the ShuffleExchangeLike to submit a shuffle job (and eventually produce a MapOutputStatistics (Apache Spark)).
Lazy Value
shuffleFuture is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
shuffleFuture is used when:
ShuffleQueryStageExecis requested to materialize and cancel
Materializing¶
doMaterialize(): Future[Any]
doMaterialize is part of the QueryStageExec abstraction.
doMaterialize returns the Shuffle MapOutputStatistics Future.
Cancelling¶
cancel(): Unit
cancel is part of the QueryStageExec abstraction.
cancel cancels the Shuffle MapOutputStatistics Future (unless already completed).
New ShuffleQueryStageExec Instance for Reuse¶
newReuseInstance(
newStageId: Int,
newOutput: Seq[Attribute]): QueryStageExec
newReuseInstance is part of the QueryStageExec abstraction.
newReuseInstance creates a new ShuffleQueryStageExec with the following:
| Attribute | Value |
|---|---|
| Query Stage ID | The given newStageId |
| SparkPlan | A new ReusedExchangeExec with the given newOutput and the ShuffleExchangeLike |
newReuseInstance requests the new ShuffleQueryStageExec to use the _resultOption.
MapOutputStatistics¶
mapStats: Option[MapOutputStatistics]
mapStats assumes that the MapOutputStatistics is already available or throws an AssertionError:
assertion failed: ShuffleQueryStageExec should already be ready
mapStats returns the MapOutputStatistics.
mapStats is used when:
AQEShuffleReadExecunary physical operator is requested for the partitionDataSizes- DynamicJoinSelection adaptive optimization is executed (and selectJoinStrategy)
- OptimizeShuffleWithLocalRead adaptive physical optimization is executed (and canUseLocalShuffleRead)
- CoalesceShufflePartitions, OptimizeSkewedJoin and OptimizeSkewInRebalancePartitions adaptive physical optimizations are executed
Runtime Statistics¶
getRuntimeStatistics: Statistics
getRuntimeStatistics is part of the QueryStageExec abstraction.
getRuntimeStatistics requests the ShuffleExchangeLike for the runtime statistics.
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.ShuffleQueryStageExec.name = org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
logger.ShuffleQueryStageExec.level = all
Refer to Logging.