ShuffledRDD¶
ShuffledRDD
is an RDD of key-value pairs that represents a shuffle step in a RDD lineage (and indicates start of a new stage).
When requested to compute a partition, ShuffledRDD
uses the one and only ShuffleDependency for a ShuffleHandle for a ShuffleReader (from the system ShuffleManager) that is used to read the (combined) key-value pairs.
Creating Instance¶
ShuffledRDD
takes the following to be created:
- RDD (of
K
keys andV
values) - Partitioner
ShuffledRDD
is created for the following RDD operators:
-
OrderedRDDFunctions.sortByKey and OrderedRDDFunctions.repartitionAndSortWithinPartitions
-
PairRDDFunctions.combineByKeyWithClassTag and PairRDDFunctions.partitionBy
-
RDD.coalesce (with
shuffle
flag enabled)
Partitioner¶
ShuffledRDD
is given a Partitioner when created:
- RangePartitioner for sortByKey
- HashPartitioner for coalesce
- Whatever passed in to the following high-level RDD operators when different from the current
Partitioner
(of the RDD):
The given Partitioner
is the partitioner of this ShuffledRDD
.
The Partitioner
is also used when:
- getDependencies (to create the only ShuffleDependency)
- getPartitions (to create as many
ShuffledRDDPartition
s as the numPartitions of thePartitioner
)
Dependencies¶
getDependencies
uses the user-specified Serializer, if defined, or requests the current SerializerManager for one.
getDependencies
uses the mapSideCombine internal flag for the types of the keys and values (i.e. K
and C
or K
and V
when the flag is enabled or not, respectively).
In the end, getDependencies
creates a single ShuffleDependency (with the previous RDD, the Partitioner, and the Serializer
).
Computing Partition¶
Signature
compute(
split: Partition,
context: TaskContext): Iterator[(K, C)]
compute
is part of the RDD abstraction.
compute
assumes that ShuffleDependency is the first dependency among the dependencies (and the only one per getDependencies).
compute
uses the SparkEnv to access the ShuffleManager. compute
requests the ShuffleManager for the ShuffleReader based on the following:
ShuffleReader | Value |
---|---|
ShuffleHandle | ShuffleHandle of the ShuffleDependency |
startPartition | The index of the given split partition |
endPartition | index + 1 |
In the end, compute
requests the ShuffleReader
to read the (combined) key-value pairs (of type (K, C)
).
Key, Value and Combiner Types¶
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]
ShuffledRDD
is given an RDD of K
keys and V
values to be created.
When computed, ShuffledRDD
produces pairs of K
keys and C
values.
isBarrier Flag¶
ShuffledRDD
has isBarrier flag always disabled (false
).
Map-Side Combine Flag¶
ShuffledRDD
uses a map-side combine flag to create a ShuffleDependency when requested for the dependencies (there is always only one).
The flag is disabled (false
) by default and can be changed using setMapSideCombine
method.
setMapSideCombine(
mapSideCombine: Boolean): ShuffledRDD[K, V, C]
setMapSideCombine
is used for PairRDDFunctions.combineByKeyWithClassTag transformation (which defaults to the flag enabled).
Placement Preferences of Partition¶
Signature
getPreferredLocations(
partition: Partition): Seq[String]
getPreferredLocations
is part of the RDD abstraction.
getPreferredLocations
requests MapOutputTrackerMaster
for the preferred locations of the given partition (BlockManagers with the most map outputs).
getPreferredLocations
uses SparkEnv
to access the current MapOutputTrackerMaster.
ShuffledRDDPartition¶
ShuffledRDDPartition
gets an index
to be created (that in turn is the index of partitions as calculated by the Partitioner).
User-Specified Serializer¶
User-specified Serializer for the single ShuffleDependency dependency
userSpecifiedSerializer: Option[Serializer] = None
userSpecifiedSerializer
is undefined (None
) by default and can be changed using setSerializer
method (that is used for PairRDDFunctions.combineByKeyWithClassTag transformation).
Demos¶
ShuffledRDD and coalesce¶
val data = sc.parallelize(0 to 9)
val coalesced = data.coalesce(numPartitions = 4, shuffle = true)
scala> println(coalesced.toDebugString)
(4) MapPartitionsRDD[9] at coalesce at <pastie>:75 []
| CoalescedRDD[8] at coalesce at <pastie>:75 []
| ShuffledRDD[7] at coalesce at <pastie>:75 []
+-(16) MapPartitionsRDD[6] at coalesce at <pastie>:75 []
| ParallelCollectionRDD[5] at parallelize at <pastie>:74 []
ShuffledRDD and sortByKey¶
val data = sc.parallelize(0 to 9)
val grouped = rdd.groupBy(_ % 2)
val sorted = grouped.sortByKey(numPartitions = 2)
scala> println(sorted.toDebugString)
(2) ShuffledRDD[15] at sortByKey at <console>:74 []
+-(4) ShuffledRDD[12] at groupBy at <console>:74 []
+-(4) MapPartitionsRDD[11] at groupBy at <console>:74 []
| MapPartitionsRDD[9] at coalesce at <pastie>:75 []
| CoalescedRDD[8] at coalesce at <pastie>:75 []
| ShuffledRDD[7] at coalesce at <pastie>:75 []
+-(16) MapPartitionsRDD[6] at coalesce at <pastie>:75 []
| ParallelCollectionRDD[5] at parallelize at <pastie>:74 []