When requested to compute a partition,
ShuffledRDD uses the one and only ShuffleDependency for a ShuffleHandle for a ShuffleReader (from the system ShuffleManager) that is used to read the (combined) key-value pairs.
ShuffledRDD takes the following to be created:
ShuffledRDD is created for the following RDD operators:
- RangePartitioner for sortByKey
- HashPartitioner for coalesce
- Whatever passed in to the following high-level RDD operators when different from the current
Partitioner(of the RDD):
Partitioner is the partitioner of this
Partitioner is also used when:
- getDependencies (to create the only ShuffleDependency)
- getPartitions (to create as many
ShuffledRDDPartitions as the numPartitions of the
getDependencies is part of the RDD abstraction.
getDependencies uses the mapSideCombine internal flag for the types of the keys and values (i.e.
V when the flag is enabled or not, respectively).
compute( split: Partition, context: TaskContext): Iterator[(K, C)]
compute is part of the RDD abstraction.
|ShuffleHandle||ShuffleHandle of the |
| ||The index of the given |
| || |
In the end,
compute requests the
ShuffleReader to read the (combined) key-value pairs (of type
Key, Value and Combiner Types¶
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]
ShuffledRDD produces pairs of
K keys and
ShuffledRDD has isBarrier flag always disabled (
Map-Side Combine Flag¶
The flag is disabled (
false) by default and can be changed using
setMapSideCombine( mapSideCombine: Boolean): ShuffledRDD[K, V, C]
setMapSideCombine is used for PairRDDFunctions.combineByKeyWithClassTag transformation (which defaults to the flag enabled).
Placement Preferences of Partition¶
getPreferredLocations( partition: Partition): Seq[String]
getPreferredLocations is part of the RDD abstraction.
SparkEnv to access the current MapOutputTrackerMaster.
ShuffledRDDPartition gets an
index to be created (that in turn is the index of partitions as calculated by the Partitioner).
userSpecifiedSerializer: Option[Serializer] = None
userSpecifiedSerializer is undefined (
None) by default and can be changed using
setSerializer method (that is used for PairRDDFunctions.combineByKeyWithClassTag transformation).
ShuffledRDD and coalesce¶
val data = sc.parallelize(0 to 9) val coalesced = data.coalesce(numPartitions = 4, shuffle = true) scala> println(coalesced.toDebugString) (4) MapPartitionsRDD at coalesce at <pastie>:75  | CoalescedRDD at coalesce at <pastie>:75  | ShuffledRDD at coalesce at <pastie>:75  +-(16) MapPartitionsRDD at coalesce at <pastie>:75  | ParallelCollectionRDD at parallelize at <pastie>:74 
ShuffledRDD and sortByKey¶
val data = sc.parallelize(0 to 9) val grouped = rdd.groupBy(_ % 2) val sorted = grouped.sortByKey(numPartitions = 2) scala> println(sorted.toDebugString) (2) ShuffledRDD at sortByKey at <console>:74  +-(4) ShuffledRDD at groupBy at <console>:74  +-(4) MapPartitionsRDD at groupBy at <console>:74  | MapPartitionsRDD at coalesce at <pastie>:75  | CoalescedRDD at coalesce at <pastie>:75  | ShuffledRDD at coalesce at <pastie>:75  +-(16) MapPartitionsRDD at coalesce at <pastie>:75  | ParallelCollectionRDD at parallelize at <pastie>:74