Skip to content

ShuffleExchangeExec Physical Operator

ShuffleExchangeExec is an Exchange (indirectly as a ShuffleExchangeLike) unary physical operator that is used to perform a shuffle.

Creating Instance

ShuffleExchangeExec takes the following to be created:

ShuffleExchangeExec is created when:

Node Name

nodeName: String

nodeName is part of the TreeNode abstraction.


nodeName is Exchange.

Performance Metrics

ShuffleExchangeExec in web UI (Details for Query)

data size

number of partitions

Number of partitions (of the Partitioner of this ShuffleDependency)

Posted as the only entry in accumUpdates of a SparkListenerDriverAccumUpdates

Read Metrics

Used to create a ShuffledRowRDD when:

fetch wait time

local blocks read

local bytes read

records read

remote blocks read

remote bytes read

remote bytes read to disk

Write Metrics

The write metrics are used to (passed directly to) create a ShuffleDependency (that in turn is used to create a ShuffleWriteProcessor)

shuffle bytes written

shuffle records written

shuffle write time

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.


doExecute gives a ShuffledRowRDD (with the ShuffleDependency and read performance metrics).

doExecute uses cachedShuffleRDD to avoid multiple execution.

UnsafeRowSerializer

serializer: Serializer

serializer is an UnsafeRowSerializer with the following properties:

serializer is used when ShuffleExchangeExec operator is requested for a ShuffleDependency.

ShuffledRowRDD

cachedShuffleRDD: ShuffledRowRDD

cachedShuffleRDD is an internal registry for the ShuffledRowRDD that ShuffleExchangeExec operator creates when executed.

The purpose of cachedShuffleRDD is to avoid multiple executions of ShuffleExchangeExec operator when it is reused in a query plan:

  • cachedShuffleRDD is uninitialized (null) when ShuffleExchangeExec operator is created
  • cachedShuffleRDD is assigned a ShuffledRowRDD when ShuffleExchangeExec operator is executed for the first time

ShuffleDependency

shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow]

shuffleDependency is a ShuffleDependency (Spark Core).

Lazy Value

shuffleDependency is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

ShuffleExchangeExec operator creates a ShuffleDependency for the following:


shuffleDependency is used when:

mapOutputStatisticsFuture

mapOutputStatisticsFuture: Future[MapOutputStatistics]

mapOutputStatisticsFuture requests the inputRDD for the number of partitions:

  • If there are zero partitions, mapOutputStatisticsFuture simply creates an already completed Future (Scala) with null value

  • Otherwise, mapOutputStatisticsFuture requests the operator's SparkContext to submitMapStage (Spark Core) with the ShuffleDependency.

Lazy Value

mapOutputStatisticsFuture is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

mapOutputStatisticsFuture is part of the ShuffleExchangeLike abstraction.

Creating ShuffleWriteProcessor

createShuffleWriteProcessor(
  metrics: Map[String, SQLMetric]): ShuffleWriteProcessor

createShuffleWriteProcessor creates a ShuffleWriteProcessor (Spark Core) to plug in SQLShuffleWriteMetricsReporter (as a ShuffleWriteMetricsReporter (Spark Core)) with the write metrics.


createShuffleWriteProcessor is used when:

Runtime Statistics

runtimeStatistics: Statistics

runtimeStatistics is part of the ShuffleExchangeLike abstraction.


runtimeStatistics creates a Statistics with the value of the following metrics.

Statistics Metric
Output size data size
Number of rows shuffle records written

Creating ShuffleDependency

prepareShuffleDependency(
  rdd: RDD[InternalRow],
  outputAttributes: Seq[Attribute],
  newPartitioning: Partitioning,
  serializer: Serializer,
  writeMetrics: Map[String, SQLMetric]): ShuffleDependency[Int, InternalRow, InternalRow]

prepareShuffleDependency is used when:


prepareShuffleDependency creates a ShuffleDependency (Apache Spark) with the following:

Write Metrics for ShuffleExchangeExec

For ShuffleExchangeExecs, the write metrics are the following:

Partitioner

prepareShuffleDependency determines a Partitioner based on the given newPartitioning Partitioning:

  • For RoundRobinPartitioning, prepareShuffleDependency creates a HashPartitioner for the same number of partitions
  • For HashPartitioning, prepareShuffleDependency creates a Partitioner for the same number of partitions and getPartition that is an "identity"
  • For RangePartitioning, prepareShuffleDependency creates a RangePartitioner for the same number of partitions and samplePointsPerPartitionHint based on spark.sql.execution.rangeExchange.sampleSizePerPartition configuration property
  • For SinglePartition, prepareShuffleDependency creates a Partitioner with 1 for the number of partitions and getPartition that always gives 0

getPartitionKeyExtractor

getPartitionKeyExtractor(): InternalRow => Any

getPartitionKeyExtractor uses the given newPartitioning Partitioning:

isRoundRobin Flag

prepareShuffleDependency determines whether "this" is isRoundRobin or not based on the given newPartitioning partitioning. It is isRoundRobin when the partitioning is a RoundRobinPartitioning with more than one partition.

rddWithPartitionIds RDD

prepareShuffleDependency creates a rddWithPartitionIds:

  1. Firstly, prepareShuffleDependency determines a newRdd based on isRoundRobin flag and spark.sql.execution.sortBeforeRepartition configuration property. When both are enabled (true), prepareShuffleDependency sorts partitions (using a UnsafeExternalRowSorter) Otherwise, prepareShuffleDependency returns the given RDD[InternalRow] (unchanged).
  2. Secondly, prepareShuffleDependency determines whether this is isOrderSensitive or not. This is isOrderSensitive when isRoundRobin flag is enabled (true) while spark.sql.execution.sortBeforeRepartition configuration property is not (false).

prepareShuffleDependency...FIXME

Demo

ShuffleExchangeExec and Repartition Logical Operator

val q = spark.range(6).repartition(2)
scala> q.explain(extended = true)
== Parsed Logical Plan ==
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))

== Analyzed Logical Plan ==
id: bigint
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))

== Optimized Logical Plan ==
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))

== Physical Plan ==
Exchange RoundRobinPartitioning(2), false, [id=#8]
+- *(1) Range (0, 6, step=1, splits=16)

ShuffleExchangeExec and RepartitionByExpression Logical Operator

val q = spark.range(6).repartition(2, 'id % 2)
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'RepartitionByExpression [('id % 2)], 2
+- Range (0, 6, step=1, splits=Some(16))

== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [(id#4L % cast(2 as bigint))], 2
+- Range (0, 6, step=1, splits=Some(16))

== Optimized Logical Plan ==
RepartitionByExpression [(id#4L % 2)], 2
+- Range (0, 6, step=1, splits=Some(16))

== Physical Plan ==
Exchange hashpartitioning((id#4L % 2), 2), false, [id=#17]
+- *(1) Range (0, 6, step=1, splits=16)