ShuffledRDD

ShuffledRDD is an RDD of key-value pairs that represents a shuffle step in a RDD lineage.

ShuffledRDD is given an RDD of key-value pairs of K and V types, respectively, when created and computes key-value pairs of K and C types, respectively.

ShuffledRDD is created for the following RDD transformations:

ShuffledRDD uses custom ShuffledRDDPartition partitions.

ShuffledRDD has isBarrier flag always disabled (false).

Creating Instance

ShuffledRDD takes the following to be created:

  • Previous RDD of key-value pairs (RDD[_ <: Product2[K, V]])

  • Partitioner

Map-Side Combine Flag

ShuffledRDD uses a map-side combine flag to create a ShuffleDependency when requested for the dependencies (there is always only one).

The flag is disabled (false) by default and can be changed using setMapSideCombine method.

setMapSideCombine(
  mapSideCombine: Boolean): ShuffledRDD[K, V, C]

setMapSideCombine is used for PairRDDFunctions.combineByKeyWithClassTag transformation (which defaults to the flag enabled).

Computing Partition

compute(
  split: Partition,
  context: TaskContext): Iterator[(K, C)]

compute requests the only dependency (that is assumed a ShuffleDependency) for the ShuffleHandle.

compute uses the SparkEnv to access the ShuffleManager.

compute requests the ShuffleManager for the ShuffleReader (for the ShuffleHandle, the partition).

In the end, compute requests the ShuffleReader to read the combined key-value pairs (of type (K, C)).

compute is part of the RDD abstraction.

Placement Preferences of Partition

getPreferredLocations(
  partition: Partition): Seq[String]

getPreferredLocations requests MapOutputTrackerMaster for the preferred locations of the given partition (BlockManagers with the most map outputs).

getPreferredLocations uses SparkEnv to access the current MapOutputTrackerMaster.

getPreferredLocations is part of the RDD abstraction.

Dependencies

getDependencies: Seq[Dependency[_]]

getDependencies uses the user-specified Serializer if defined or requests the current SerializerManager for one.

getDependencies uses the mapSideCombine internal flag for the types of the keys and values (i.e. K and C or K and V when the flag is enabled or not, respectively).

In the end, getDependencies returns a single ShuffleDependency (with the previous RDD, the Partitioner, and the Serializer).

getDependencies is part of the RDD abstraction.

ShuffledRDDPartition

ShuffledRDDPartition gets an index to be created (that in turn is the index of partitions as calculated by the Partitioner of a ShuffledRDD).

Demos

Demo: ShuffledRDD and coalesce

val data = sc.parallelize(0 to 9)
val coalesced = data.coalesce(numPartitions = 4, shuffle = true)
scala> println(coalesced.toDebugString)
(4) MapPartitionsRDD[9] at coalesce at <pastie>:75 []
 |  CoalescedRDD[8] at coalesce at <pastie>:75 []
 |  ShuffledRDD[7] at coalesce at <pastie>:75 []
 +-(16) MapPartitionsRDD[6] at coalesce at <pastie>:75 []
    |   ParallelCollectionRDD[5] at parallelize at <pastie>:74 []

Demo: ShuffledRDD and sortByKey

val data = sc.parallelize(0 to 9)
val grouped = rdd.groupBy(_ % 2)
val sorted = grouped.sortByKey(numPartitions = 2)
scala> println(sorted.toDebugString)
(2) ShuffledRDD[15] at sortByKey at <console>:74 []
 +-(4) ShuffledRDD[12] at groupBy at <console>:74 []
    +-(4) MapPartitionsRDD[11] at groupBy at <console>:74 []
       |  MapPartitionsRDD[9] at coalesce at <pastie>:75 []
       |  CoalescedRDD[8] at coalesce at <pastie>:75 []
       |  ShuffledRDD[7] at coalesce at <pastie>:75 []
       +-(16) MapPartitionsRDD[6] at coalesce at <pastie>:75 []
          |   ParallelCollectionRDD[5] at parallelize at <pastie>:74 []

Internal Properties

Name Description

userSpecifiedSerializer

User-specified Serializer for the single ShuffleDependency dependency

userSpecifiedSerializer: Option[Serializer] = None

userSpecifiedSerializer is undefined (None) by default and can be changed using setSerializer method (that is used for PairRDDFunctions.combineByKeyWithClassTag transformation).