ShuffledRowRDD¶
ShuffledRowRDD
is an RDD
(Spark Core) of InternalRows (RDD[InternalRow]
) used for execution of the following physical operators:
- AQEShuffleReadExec (Adaptive Query Execution)
- CollectLimitExec
- ShuffleExchangeExec
TakeOrderedAndProjectExec
ShuffledRDD¶
ShuffledRowRDD
is similar to ShuffledRDD
(Spark Core), with the difference of the type of the values to process, i.e. InternalRow and (K, C)
key-value pairs, respectively.
Creating Instance¶
ShuffledRowRDD
takes the following to be created:
-
ShuffleDependency[Int, InternalRow, InternalRow]
(Spark Core) - Write Metrics
- ShufflePartitionSpecs (default:
CoalescedPartitionSpec
s)
When created, ShuffledRowRDD
uses the spark.sql.adaptive.fetchShuffleBlocksInBatch configuration property to set the __fetch_continuous_blocks_in_batch_enabled local property to true
.
ShuffledRowRDD
is created when:
- CollectLimitExec, ShuffleExchangeExec and
TakeOrderedAndProjectExec
physical operators are executed ShuffleExchangeExec
is requested for a shuffle RDD (for AQEShuffleReadExec)
Write Metrics¶
metrics: Map[String, SQLMetric]
ShuffledRowRDD
is given a collection of SQLMetrics by name.
metrics
is used to create a SQLShuffleReadMetricsReporter
while computing a partition (to create a ShuffleReader
(Spark Core)).
Note
SQLShuffleReadMetricsReporter
is a ShuffleReadMetricsReporter
(Spark Core).
Computing Partition¶
compute(
split: Partition,
context: TaskContext): Iterator[InternalRow]
compute
requests the given TaskContext
(Spark Core) for the TaskMetrics
(Spark Core) that is in turn requested for a TempShuffleReadMetrics
.
compute
creates a SQLShuffleReadMetricsReporter
(with the TempShuffleReadMetrics
and the SQL Metrics).
compute
assumes that the given Partition
(Spark Core) is a ShuffledRowRDDPartition
and requests it for the ShufflePartitionSpec
.
compute
requests the ShuffleManager
(Spark Core) for a ShuffleReader
(Spark Core) based on the type of ShufflePartitionSpec
.
ShufflePartitionSpec | startPartition | endPartition |
---|---|---|
CoalescedPartitionSpec | startReducerIndex | endReducerIndex |
ShufflePartitionSpec | startMapIndex | endMapIndex | startPartition | endPartition |
---|---|---|---|---|
PartialReducerPartitionSpec | startMapIndex | endMapIndex | reducerIndex | reducerIndex + 1 |
PartialMapperPartitionSpec | mapIndex | mapIndex + 1 | startReducerIndex | endReducerIndex |
CoalescedMapperPartitionSpec | startMapIndex | endMapIndex | 0 | numReducers |
In the end, compute
requests the ShuffleReader
to read combined records (Iterator[Product2[Int, InternalRow]]
) and takes out InternalRow
values only (and ignoring keys).
compute
is part of RDD
(Spark Core) abstraction.
Partition Specs¶
ShuffledRowRDD
can be given a Partition Specs when created.
When not given, it is assumed to use as many CoalescedPartitionSpec
s as the number of partitions of ShuffleDependency (based on the Partitioner
).
RDD Dependencies¶
getDependencies: Seq[Dependency[_]]
getDependencies
is part of RDD
(Spark Core) abstraction.
A single-element collection with ShuffleDependency[Int, InternalRow, InternalRow]
.
Partitioner¶
partitioner: Option[Partitioner]
partitioner
is part of RDD
(Spark Core) abstraction.
partitioner
is CoalescedPartitioner
when the following all hold:
- Partition Specs are all
CoalescedPartitionSpec
- The
startReducerIndex
s of theCoalescedPartitionSpec
s are all unique
Otherwise, partitioner
is undefined (None
).
Partitions¶
getPartitions: Array[Partition]
getPartitions
is part of RDD
(Spark Core) abstraction.
getPartitions
...FIXME
Preferred Locations of Partition¶
getPreferredLocations(
partition: Partition): Seq[String]
getPreferredLocations
is part of RDD
(Spark Core) abstraction.
getPreferredLocations
...FIXME
Clearing Dependencies¶
clearDependencies(): Unit
clearDependencies
is part of RDD
(Spark Core) abstraction.
clearDependencies
simply requests the parent RDD to clearDependencies
followed by clear the given dependency (i.e. set to null
).