ShuffledRowRDD¶
ShuffledRowRDD is an RDD (Spark Core) of InternalRows (RDD[InternalRow]) used for execution of the following physical operators:
- AQEShuffleReadExec (Adaptive Query Execution)
- CollectLimitExec
- ShuffleExchangeExec
TakeOrderedAndProjectExec
ShuffledRDD¶
ShuffledRowRDD is similar to ShuffledRDD (Spark Core), with the difference of the type of the values to process, i.e. InternalRow and (K, C) key-value pairs, respectively.
Creating Instance¶
ShuffledRowRDD takes the following to be created:
-
ShuffleDependency[Int, InternalRow, InternalRow](Spark Core) - Write Metrics
- ShufflePartitionSpecs (default:
CoalescedPartitionSpecs)
When created, ShuffledRowRDD uses the spark.sql.adaptive.fetchShuffleBlocksInBatch configuration property to set the __fetch_continuous_blocks_in_batch_enabled local property to true.
ShuffledRowRDD is created when:
- CollectLimitExec, ShuffleExchangeExec and
TakeOrderedAndProjectExecphysical operators are executed ShuffleExchangeExecis requested for a shuffle RDD (for AQEShuffleReadExec)
Write Metrics¶
metrics: Map[String, SQLMetric]
ShuffledRowRDD is given a collection of SQLMetrics by name.
metrics is used to create a SQLShuffleReadMetricsReporter while computing a partition (to create a ShuffleReader (Spark Core)).
Note
SQLShuffleReadMetricsReporter is a ShuffleReadMetricsReporter (Spark Core).
Computing Partition¶
compute(
split: Partition,
context: TaskContext): Iterator[InternalRow]
compute requests the given TaskContext (Spark Core) for the TaskMetrics (Spark Core) that is in turn requested for a TempShuffleReadMetrics.
compute creates a SQLShuffleReadMetricsReporter (with the TempShuffleReadMetrics and the SQL Metrics).
compute assumes that the given Partition (Spark Core) is a ShuffledRowRDDPartition and requests it for the ShufflePartitionSpec.
compute requests the ShuffleManager (Spark Core) for a ShuffleReader (Spark Core) based on the type of ShufflePartitionSpec.
| ShufflePartitionSpec | startPartition | endPartition |
|---|---|---|
| CoalescedPartitionSpec | startReducerIndex | endReducerIndex |
| ShufflePartitionSpec | startMapIndex | endMapIndex | startPartition | endPartition |
|---|---|---|---|---|
| PartialReducerPartitionSpec | startMapIndex | endMapIndex | reducerIndex | reducerIndex + 1 |
| PartialMapperPartitionSpec | mapIndex | mapIndex + 1 | startReducerIndex | endReducerIndex |
| CoalescedMapperPartitionSpec | startMapIndex | endMapIndex | 0 | numReducers |
In the end, compute requests the ShuffleReader to read combined records (Iterator[Product2[Int, InternalRow]]) and takes out InternalRow values only (and ignoring keys).
compute is part of RDD (Spark Core) abstraction.
Partition Specs¶
ShuffledRowRDD can be given a Partition Specs when created.
When not given, it is assumed to use as many CoalescedPartitionSpecs as the number of partitions of ShuffleDependency (based on the Partitioner).
RDD Dependencies¶
getDependencies: Seq[Dependency[_]]
getDependencies is part of RDD (Spark Core) abstraction.
A single-element collection with ShuffleDependency[Int, InternalRow, InternalRow].
Partitioner¶
partitioner: Option[Partitioner]
partitioner is part of RDD (Spark Core) abstraction.
partitioner is CoalescedPartitioner when the following all hold:
- Partition Specs are all
CoalescedPartitionSpec - The
startReducerIndexs of theCoalescedPartitionSpecs are all unique
Otherwise, partitioner is undefined (None).
Partitions¶
getPartitions: Array[Partition]
getPartitions is part of RDD (Spark Core) abstraction.
getPartitions...FIXME
Preferred Locations of Partition¶
getPreferredLocations(
partition: Partition): Seq[String]
getPreferredLocations is part of RDD (Spark Core) abstraction.
getPreferredLocations...FIXME
Clearing Dependencies¶
clearDependencies(): Unit
clearDependencies is part of RDD (Spark Core) abstraction.
clearDependencies simply requests the parent RDD to clearDependencies followed by clear the given dependency (i.e. set to null).