Skip to content

ShuffleQueryStageExec Adaptive Leaf Physical Operator

ShuffleQueryStageExec is a QueryStageExec with either ShuffleExchangeExec or ReusedExchangeExec child operators.

Creating Instance

ShuffleQueryStageExec takes the following to be created:

ShuffleQueryStageExec is created when:

ShuffleExchangeLike

shuffle: ShuffleExchangeLike

ShuffleQueryStageExec initializes the shuffle internal registry when created.

ShuffleQueryStageExec assumes that the given physical operator is either a ShuffleExchangeLike or a ReusedExchangeExec and extracts the ShuffleExchangeLike.

If not, ShuffleQueryStageExec throws an IllegalStateException:

wrong plan for shuffle stage:
[tree]

shuffle is used when:

Shuffle MapOutputStatistics Future

shuffleFuture: Future[MapOutputStatistics]

shuffleFuture requests the ShuffleExchangeLike to submit a shuffle job (and eventually produce a MapOutputStatistics (Apache Spark)).

Lazy Value

shuffleFuture is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

shuffleFuture is used when:

Materializing

doMaterialize(): Future[Any]

doMaterialize is part of the QueryStageExec abstraction.


doMaterialize returns the Shuffle MapOutputStatistics Future.

Cancelling

cancel(): Unit

cancel is part of the QueryStageExec abstraction.


cancel cancels the Shuffle MapOutputStatistics Future (unless already completed).

New ShuffleQueryStageExec Instance for Reuse

newReuseInstance(
  newStageId: Int,
  newOutput: Seq[Attribute]): QueryStageExec

newReuseInstance is part of the QueryStageExec abstraction.


newReuseInstance creates a new ShuffleQueryStageExec with the following:

Attribute Value
Query Stage ID The given newStageId
SparkPlan A new ReusedExchangeExec with the given newOutput and the ShuffleExchangeLike

newReuseInstance requests the new ShuffleQueryStageExec to use the _resultOption.

MapOutputStatistics

mapStats: Option[MapOutputStatistics]

mapStats assumes that the MapOutputStatistics is already available or throws an AssertionError:

assertion failed: ShuffleQueryStageExec should already be ready

mapStats returns the MapOutputStatistics.


mapStats is used when:

Runtime Statistics

getRuntimeStatistics: Statistics

getRuntimeStatistics is part of the QueryStageExec abstraction.


getRuntimeStatistics requests the ShuffleExchangeLike for the runtime statistics.

Logging

Enable ALL logging level for org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.ShuffleQueryStageExec.name = org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
logger.ShuffleQueryStageExec.level = all

Refer to Logging.