ShuffleExchangeExec Unary Physical Operator¶
ShuffleExchangeExec
is an Exchange (indirectly as a ShuffleExchangeLike) unary physical operator that is used to perform a shuffle.
Creating Instance¶
ShuffleExchangeExec
takes the following to be created:
- Output Partitioning
- Child physical operator
- ShuffleOrigin (default: ENSURE_REQUIREMENTS)
ShuffleExchangeExec
is created when:
- BasicOperators execution planning strategy is executed and plans the following:
- Repartition with the shuffle flag enabled
- RepartitionByExpression
- EnsureRequirements physical optimization is executed
Node Name¶
nodeName: String
nodeName
is part of the TreeNode abstraction.
nodeName
is Exchange.
Performance Metrics¶
Key | Name (in web UI) |
---|---|
dataSize | data size |
fetchWaitTime | fetch wait time |
localBlocksFetched | local blocks read |
localBytesRead | local bytes read |
recordsRead | records read |
remoteBlocksFetched | remote blocks read |
remoteBytesRead | remote bytes read |
remoteBytesReadToDisk | remote bytes read to disk |
shuffleBytesWritten | shuffle bytes written |
shuffleRecordsWritten | shuffle records written |
shuffleWriteTime | shuffle write time |
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan abstraction.
doExecute
gives a ShuffledRowRDD (with the ShuffleDependency and read performance metrics).
doExecute
uses cachedShuffleRDD to avoid multiple execution.
Creating ShuffleDependency¶
prepareShuffleDependency(
rdd: RDD[InternalRow],
outputAttributes: Seq[Attribute],
newPartitioning: Partitioning,
serializer: Serializer,
writeMetrics: Map[String, SQLMetric]): ShuffleDependency[Int, InternalRow, InternalRow]
prepareShuffleDependency
creates a ShuffleDependency
(Apache Spark) with an RDD[Product2[Int, InternalRow]]
(where Ints
are partition IDs of the InternalRows
values) and the given Serializer
(e.g. the Serializer of the ShuffleExchangeExec
physical operator).
Partitioner¶
prepareShuffleDependency
determines a Partitioner
based on the given newPartitioning
Partitioning:
- For RoundRobinPartitioning,
prepareShuffleDependency
creates aHashPartitioner
for the same number of partitions - For HashPartitioning,
prepareShuffleDependency
creates aPartitioner
for the same number of partitions andgetPartition
that is an "identity" - For
RangePartitioning
,prepareShuffleDependency
creates aRangePartitioner
for the same number of partitions andsamplePointsPerPartitionHint
based on spark.sql.execution.rangeExchange.sampleSizePerPartition configuration property - For SinglePartition,
prepareShuffleDependency
creates aPartitioner
with1
for the number of partitions andgetPartition
that always gives0
getPartitionKeyExtractor Internal Method¶
prepareShuffleDependency
defines a getPartitionKeyExtractor
method.
getPartitionKeyExtractor(): InternalRow => Any
getPartitionKeyExtractor
uses the given newPartitioning
Partitioning:
- For RoundRobinPartitioning,...FIXME
- For HashPartitioning,...FIXME
- For
RangePartitioning
,...FIXME - For SinglePartition,...FIXME
isRoundRobin Internal Flag¶
prepareShuffleDependency
determines whether "this" is isRoundRobin
or not based on the given newPartitioning
partitioning. It is isRoundRobin
when the partitioning is a RoundRobinPartitioning
with more than one partition.
rddWithPartitionIds RDD¶
prepareShuffleDependency
creates a rddWithPartitionIds
:
- Firstly,
prepareShuffleDependency
determines anewRdd
based onisRoundRobin
flag and spark.sql.execution.sortBeforeRepartition configuration property. When both are enabled (true
),prepareShuffleDependency
sorts partitions (using aUnsafeExternalRowSorter
) Otherwise,prepareShuffleDependency
returns the givenRDD[InternalRow]
(unchanged). - Secondly,
prepareShuffleDependency
determines whether this isisOrderSensitive
or not. This isisOrderSensitive
whenisRoundRobin
flag is enabled (true
) while spark.sql.execution.sortBeforeRepartition configuration property is not (false
).
prepareShuffleDependency
...FIXME
Usage¶
prepareShuffleDependency
is used when:
- CollectLimitExec, <
> and TakeOrderedAndProjectExec physical operators are executed
UnsafeRowSerializer¶
serializer: Serializer
serializer
is an UnsafeRowSerializer
with the following properties:
- Number of fields is the number of the output attributes of the child physical operator
- dataSize performance metric
serializer
is used when ShuffleExchangeExec
operator is requested for a ShuffleDependency.
ShuffledRowRDD¶
cachedShuffleRDD: ShuffledRowRDD
cachedShuffleRDD
is an internal registry for the ShuffledRowRDD that ShuffleExchangeExec
operator creates when executed.
The purpose of cachedShuffleRDD
is to avoid multiple executions of ShuffleExchangeExec
operator when it is reused in a query plan:
cachedShuffleRDD
is uninitialized (null
) whenShuffleExchangeExec
operator is createdcachedShuffleRDD
is assigned aShuffledRowRDD
whenShuffleExchangeExec
operator is executed for the first time
ShuffleDependency¶
shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow]
shuffleDependency
is a ShuffleDependency
(Spark Core).
Lazy Value
shuffleDependency
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
ShuffleExchangeExec
operator creates a ShuffleDependency for the following:
- Input RDD
- Output attributes of the child physical operator
- Output partitioning
- UnsafeRowSerializer
- writeMetrics
shuffleDependency
is used when:
CustomShuffleReaderExec
physical operator is executed- OptimizeShuffleWithLocalRead is requested to
getPartitionSpecs
- OptimizeSkewedJoin physical optimization is executed
ShuffleExchangeExec
physical operator is executed and requested for MapOutputStatistics
mapOutputStatisticsFuture¶
mapOutputStatisticsFuture: Future[MapOutputStatistics]
mapOutputStatisticsFuture
requests the inputRDD for the number of partitions:
-
If there are zero partitions,
mapOutputStatisticsFuture
simply creates an already completedFuture
(Scala) withnull
value -
Otherwise,
mapOutputStatisticsFuture
requests the operator'sSparkContext
tosubmitMapStage
(Spark Core) with the ShuffleDependency.
Lazy Value
mapOutputStatisticsFuture
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
mapOutputStatisticsFuture
is part of the ShuffleExchangeLike abstraction.
createShuffleWriteProcessor¶
createShuffleWriteProcessor(
metrics: Map[String, SQLMetric]): ShuffleWriteProcessor
createShuffleWriteProcessor
creates a Spark Core ShuffleWriteProcessor
for the only reason to plug in a custom ShuffleWriteMetricsReporter
(SQLShuffleWriteMetricsReporter
).
createShuffleWriteProcessor
is used when ShuffleExchangeExec
operator is executed (and requested to prepareShuffleDependency).
Demo¶
ShuffleExchangeExec and Repartition Logical Operator¶
val q = spark.range(6).repartition(2)
scala> q.explain(extended = true)
== Parsed Logical Plan ==
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))
== Analyzed Logical Plan ==
id: bigint
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))
== Optimized Logical Plan ==
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))
== Physical Plan ==
Exchange RoundRobinPartitioning(2), false, [id=#8]
+- *(1) Range (0, 6, step=1, splits=16)
ShuffleExchangeExec and RepartitionByExpression Logical Operator¶
val q = spark.range(6).repartition(2, 'id % 2)
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'RepartitionByExpression [('id % 2)], 2
+- Range (0, 6, step=1, splits=Some(16))
== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [(id#4L % cast(2 as bigint))], 2
+- Range (0, 6, step=1, splits=Some(16))
== Optimized Logical Plan ==
RepartitionByExpression [(id#4L % 2)], 2
+- Range (0, 6, step=1, splits=Some(16))
== Physical Plan ==
Exchange hashpartitioning((id#4L % 2), 2), false, [id=#17]
+- *(1) Range (0, 6, step=1, splits=16)