ShuffleQueryStageExec Adaptive Leaf Physical Operator¶
ShuffleQueryStageExec
is a QueryStageExec with either ShuffleExchangeExec or ReusedExchangeExec child operators.
Creating Instance¶
ShuffleQueryStageExec
takes the following to be created:
ShuffleQueryStageExec
is created when:
- AdaptiveSparkPlanExec physical operator is requested to newQueryStage (for a ShuffleExchangeExec)
ShuffleQueryStageExec
physical operator is requested to newReuseInstance
ShuffleExchangeLike¶
shuffle: ShuffleExchangeLike
ShuffleQueryStageExec
initializes the shuffle
internal registry when created.
ShuffleQueryStageExec
assumes that the given physical operator is either a ShuffleExchangeLike or a ReusedExchangeExec and extracts the ShuffleExchangeLike
.
If not, ShuffleQueryStageExec
throws an IllegalStateException
:
wrong plan for shuffle stage:
[tree]
shuffle
is used when:
AQEShuffleReadExec
unary physical operator is requested for the shuffleRDD- CoalesceShufflePartitions physical optimization is executed
- OptimizeShuffleWithLocalRead physical optimization is executed
- OptimizeSkewedJoin physical optimization is executed
- OptimizeSkewInRebalancePartitions physical optimization is executed
ShuffleQueryStageExec
leaf physical operator is requested for the shuffle MapOutputStatistics, newReuseInstance and getRuntimeStatistics
Shuffle MapOutputStatistics Future¶
shuffleFuture: Future[MapOutputStatistics]
shuffleFuture
requests the ShuffleExchangeLike to submit a shuffle job (and eventually produce a MapOutputStatistics
(Apache Spark)).
Lazy Value
shuffleFuture
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
shuffleFuture
is used when:
ShuffleQueryStageExec
is requested to materialize and cancel
Materializing¶
doMaterialize(): Future[Any]
doMaterialize
is part of the QueryStageExec abstraction.
doMaterialize
returns the Shuffle MapOutputStatistics Future.
Cancelling¶
cancel(): Unit
cancel
is part of the QueryStageExec abstraction.
cancel
cancels the Shuffle MapOutputStatistics Future (unless already completed).
New ShuffleQueryStageExec Instance for Reuse¶
newReuseInstance(
newStageId: Int,
newOutput: Seq[Attribute]): QueryStageExec
newReuseInstance
is part of the QueryStageExec abstraction.
newReuseInstance
creates a new ShuffleQueryStageExec
with the following:
Attribute | Value |
---|---|
Query Stage ID | The given newStageId |
SparkPlan | A new ReusedExchangeExec with the given newOutput and the ShuffleExchangeLike |
newReuseInstance
requests the new ShuffleQueryStageExec
to use the _resultOption.
MapOutputStatistics¶
mapStats: Option[MapOutputStatistics]
mapStats
assumes that the MapOutputStatistics is already available or throws an AssertionError
:
assertion failed: ShuffleQueryStageExec should already be ready
mapStats
returns the MapOutputStatistics.
mapStats
is used when:
AQEShuffleReadExec
unary physical operator is requested for the partitionDataSizes- DynamicJoinSelection adaptive optimization is executed (and selectJoinStrategy)
- OptimizeShuffleWithLocalRead adaptive physical optimization is executed (and canUseLocalShuffleRead)
- CoalesceShufflePartitions, OptimizeSkewedJoin and OptimizeSkewInRebalancePartitions adaptive physical optimizations are executed
Runtime Statistics¶
getRuntimeStatistics: Statistics
getRuntimeStatistics
is part of the QueryStageExec abstraction.
getRuntimeStatistics
requests the ShuffleExchangeLike for the runtime statistics.
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.ShuffleQueryStageExec.name = org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
logger.ShuffleQueryStageExec.level = all
Refer to Logging.