Skip to content

ShuffledRowRDD

ShuffledRowRDD is an RDD (Spark Core) of InternalRows (RDD[InternalRow]) used for execution of the following physical operators:

ShuffledRDD

ShuffledRowRDD is similar to ShuffledRDD (Spark Core), with the difference of the type of the values to process, i.e. InternalRow and (K, C) key-value pairs, respectively.

Creating Instance

ShuffledRowRDD takes the following to be created:

When created, ShuffledRowRDD uses the spark.sql.adaptive.fetchShuffleBlocksInBatch configuration property to set the __fetch_continuous_blocks_in_batch_enabled local property to true.

ShuffledRowRDD is created when:

Write Metrics

metrics: Map[String, SQLMetric]

ShuffledRowRDD is given a collection of SQLMetrics by name.

metrics is used to create a SQLShuffleReadMetricsReporter while computing a partition (to create a ShuffleReader (Spark Core)).

Note

SQLShuffleReadMetricsReporter is a ShuffleReadMetricsReporter (Spark Core).

Computing Partition

compute(
  split: Partition,
  context: TaskContext): Iterator[InternalRow]

compute requests the given TaskContext (Spark Core) for the TaskMetrics (Spark Core) that is in turn requested for a TempShuffleReadMetrics.

compute creates a SQLShuffleReadMetricsReporter (with the TempShuffleReadMetrics and the SQL Metrics).

compute assumes that the given Partition (Spark Core) is a ShuffledRowRDDPartition and requests it for the ShufflePartitionSpec.

compute requests the ShuffleManager (Spark Core) for a ShuffleReader (Spark Core) based on the type of ShufflePartitionSpec.

ShufflePartitionSpec startPartition endPartition
CoalescedPartitionSpec startReducerIndex endReducerIndex
ShufflePartitionSpec startMapIndex endMapIndex startPartition endPartition
PartialReducerPartitionSpec startMapIndex endMapIndex reducerIndex reducerIndex + 1
PartialMapperPartitionSpec mapIndex mapIndex + 1 startReducerIndex endReducerIndex
CoalescedMapperPartitionSpec startMapIndex endMapIndex 0 numReducers

In the end, compute requests the ShuffleReader to read combined records (Iterator[Product2[Int, InternalRow]]) and takes out InternalRow values only (and ignoring keys).

compute is part of RDD (Spark Core) abstraction.

Partition Specs

ShuffledRowRDD can be given a Partition Specs when created.

When not given, it is assumed to use as many CoalescedPartitionSpecs as the number of partitions of ShuffleDependency (based on the Partitioner).

RDD Dependencies

getDependencies: Seq[Dependency[_]]

getDependencies is part of RDD (Spark Core) abstraction.

A single-element collection with ShuffleDependency[Int, InternalRow, InternalRow].

Partitioner

partitioner: Option[Partitioner]

partitioner is part of RDD (Spark Core) abstraction.

partitioner is CoalescedPartitioner when the following all hold:

  1. Partition Specs are all CoalescedPartitionSpec
  2. The startReducerIndexs of the CoalescedPartitionSpecs are all unique

Otherwise, partitioner is undefined (None).

Partitions

getPartitions: Array[Partition]

getPartitions is part of RDD (Spark Core) abstraction.

getPartitions...FIXME

Preferred Locations of Partition

getPreferredLocations(
  partition: Partition): Seq[String]

getPreferredLocations is part of RDD (Spark Core) abstraction.

getPreferredLocations...FIXME

Clearing Dependencies

clearDependencies(): Unit

clearDependencies is part of RDD (Spark Core) abstraction.

clearDependencies simply requests the parent RDD to clearDependencies followed by clear the given dependency (i.e. set to null).