Skip to content

ShuffledRDD

ShuffledRDD is an RDD of key-value pairs that represents a shuffle step in a RDD lineage (and indicates start of a new stage).

When requested to compute a partition, ShuffledRDD uses the one and only ShuffleDependency for a ShuffleHandle for a ShuffleReader (from the system ShuffleManager) that is used to read the (combined) key-value pairs.

Creating Instance

ShuffledRDD takes the following to be created:

ShuffledRDD is created for the following RDD operators:

Partitioner

ShuffledRDD is given a Partitioner when created:

The given Partitioner is the partitioner of this ShuffledRDD.

The Partitioner is also used when:

Dependencies

Signature
getDependencies: Seq[Dependency[_]]

getDependencies is part of the RDD abstraction.

getDependencies uses the user-specified Serializer, if defined, or requests the current SerializerManager for one.

getDependencies uses the mapSideCombine internal flag for the types of the keys and values (i.e. K and C or K and V when the flag is enabled or not, respectively).

In the end, getDependencies creates a single ShuffleDependency (with the previous RDD, the Partitioner, and the Serializer).

Computing Partition

Signature
compute(
  split: Partition,
  context: TaskContext): Iterator[(K, C)]

compute is part of the RDD abstraction.

compute assumes that ShuffleDependency is the first dependency among the dependencies (and the only one per getDependencies).

compute uses the SparkEnv to access the ShuffleManager. compute requests the ShuffleManager for the ShuffleReader based on the following:

ShuffleReader Value
ShuffleHandle ShuffleHandle of the ShuffleDependency
startPartition The index of the given split partition
endPartition index + 1

In the end, compute requests the ShuffleReader to read the (combined) key-value pairs (of type (K, C)).

Key, Value and Combiner Types

class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]

ShuffledRDD is given an RDD of K keys and V values to be created.

When computed, ShuffledRDD produces pairs of K keys and C values.

isBarrier Flag

ShuffledRDD has isBarrier flag always disabled (false).

Map-Side Combine Flag

ShuffledRDD uses a map-side combine flag to create a ShuffleDependency when requested for the dependencies (there is always only one).

The flag is disabled (false) by default and can be changed using setMapSideCombine method.

setMapSideCombine(
  mapSideCombine: Boolean): ShuffledRDD[K, V, C]

setMapSideCombine is used for PairRDDFunctions.combineByKeyWithClassTag transformation (which defaults to the flag enabled).

Placement Preferences of Partition

Signature
getPreferredLocations(
  partition: Partition): Seq[String]

getPreferredLocations is part of the RDD abstraction.

getPreferredLocations requests MapOutputTrackerMaster for the preferred locations of the given partition (BlockManagers with the most map outputs).

getPreferredLocations uses SparkEnv to access the current MapOutputTrackerMaster.

ShuffledRDDPartition

ShuffledRDDPartition gets an index to be created (that in turn is the index of partitions as calculated by the Partitioner).

User-Specified Serializer

User-specified Serializer for the single ShuffleDependency dependency

userSpecifiedSerializer: Option[Serializer] = None

userSpecifiedSerializer is undefined (None) by default and can be changed using setSerializer method (that is used for PairRDDFunctions.combineByKeyWithClassTag transformation).

Demos

ShuffledRDD and coalesce

val data = sc.parallelize(0 to 9)
val coalesced = data.coalesce(numPartitions = 4, shuffle = true)
scala> println(coalesced.toDebugString)
(4) MapPartitionsRDD[9] at coalesce at <pastie>:75 []
 |  CoalescedRDD[8] at coalesce at <pastie>:75 []
 |  ShuffledRDD[7] at coalesce at <pastie>:75 []
 +-(16) MapPartitionsRDD[6] at coalesce at <pastie>:75 []
    |   ParallelCollectionRDD[5] at parallelize at <pastie>:74 []

ShuffledRDD and sortByKey

val data = sc.parallelize(0 to 9)
val grouped = rdd.groupBy(_ % 2)
val sorted = grouped.sortByKey(numPartitions = 2)
scala> println(sorted.toDebugString)
(2) ShuffledRDD[15] at sortByKey at <console>:74 []
 +-(4) ShuffledRDD[12] at groupBy at <console>:74 []
    +-(4) MapPartitionsRDD[11] at groupBy at <console>:74 []
       |  MapPartitionsRDD[9] at coalesce at <pastie>:75 []
       |  CoalescedRDD[8] at coalesce at <pastie>:75 []
       |  ShuffledRDD[7] at coalesce at <pastie>:75 []
       +-(16) MapPartitionsRDD[6] at coalesce at <pastie>:75 []
          |   ParallelCollectionRDD[5] at parallelize at <pastie>:74 []