Skip to content

ShuffledRDD

= [[ShuffledRDD]] ShuffledRDD

ShuffledRDD is an RDD.md[RDD] of key-value pairs that represents a shuffle step in a spark-rdd-lineage.md[RDD lineage].

ShuffledRDD is given an <> of key-value pairs of K and V types, respectively, when <> and <> key-value pairs of K and C types, respectively.

ShuffledRDD is <> for the following RDD transformations:

  • spark-rdd-OrderedRDDFunctions.md#sortByKey[OrderedRDDFunctions.sortByKey] and spark-rdd-OrderedRDDFunctions.md#repartitionAndSortWithinPartitions[OrderedRDDFunctions.repartitionAndSortWithinPartitions]

  • PairRDDFunctions.md#combineByKeyWithClassTag[PairRDDFunctions.combineByKeyWithClassTag] and PairRDDFunctions.md#partitionBy[PairRDDFunctions.partitionBy]

  • spark-rdd-transformations.md#coalesce[RDD.coalesce] (with shuffle flag enabled)

ShuffledRDD uses custom <> partitions.

[[isBarrier]] ShuffledRDD has RDD.md#isBarrier[isBarrier] flag always disabled (false).

== [[creating-instance]] Creating Instance

ShuffledRDD takes the following to be created:

  • [[prev]] Previous RDD.md[RDD] of key-value pairs (RDD[_ <: Product2[K, V]])
  • [[part]] Partitioner.md[Partitioner]

== [[mapSideCombine]][[setMapSideCombine]] Map-Side Combine Flag

ShuffledRDD uses a map-side combine flag to create a ShuffleDependency when requested for the <> (there is always only one).

The flag is disabled (false) by default and can be changed using setMapSideCombine method.

[source,scala]

setMapSideCombine( mapSideCombine: Boolean): ShuffledRDD[K, V, C]


setMapSideCombine is used for PairRDDFunctions.md#combineByKeyWithClassTag[PairRDDFunctions.combineByKeyWithClassTag] transformation (which defaults to the flag enabled).

== [[compute]] Computing Partition

[source, scala]

compute( split: Partition, context: TaskContext): Iterator[(K, C)]


compute requests the only RDD.md#dependencies[dependency] (that is assumed a ShuffleDependency) for the ShuffleHandle.

compute uses the SparkEnv to access the ShuffleManager.

compute requests the shuffle:ShuffleManager.md#shuffleManager[ShuffleManager] for the shuffle:ShuffleManager.md#getReader[ShuffleReader] (for the ShuffleHandle, the spark-rdd-Partition.md[partition]).

In the end, compute requests the ShuffleReader to shuffle:spark-shuffle-ShuffleReader.md#read[read] the combined key-value pairs (of type (K, C)).

compute is part of the RDD.md#compute[RDD] abstraction.

== [[getPreferredLocations]] Placement Preferences of Partition

[source, scala]

getPreferredLocations( partition: Partition): Seq[String]


getPreferredLocations requests MapOutputTrackerMaster for the scheduler:MapOutputTrackerMaster.md#getPreferredLocationsForShuffle[preferred locations] of the given spark-rdd-Partition.md[partition] (storage:BlockManager.md[BlockManagers] with the most map outputs).

getPreferredLocations uses SparkEnv to access the current core:SparkEnv.md#mapOutputTracker[MapOutputTrackerMaster].

getPreferredLocations is part of the RDD.md#compute[RDD] abstraction.

== [[getDependencies]] Dependencies

[source, scala]

getDependencies: Seq[Dependency[_]]

getDependencies uses the <> if defined or requests the current serializer:SerializerManager.md[SerializerManager] for serializer:SerializerManager.md#getSerializer[one].

getDependencies uses the <> internal flag for the types of the keys and values (i.e. K and C or K and V when the flag is enabled or not, respectively).

In the end, getDependencies returns a single ShuffleDependency (with the <>, the <>, and the Serializer).

getDependencies is part of the RDD.md#getDependencies[RDD] abstraction.

== [[ShuffledRDDPartition]] ShuffledRDDPartition

ShuffledRDDPartition gets an index to be created (that in turn is the index of partitions as calculated by the Partitioner.md[Partitioner] of a <>).

== Demos

=== Demo: ShuffledRDD and coalesce

[source,plaintext]

val data = sc.parallelize(0 to 9) val coalesced = data.coalesce(numPartitions = 4, shuffle = true) scala> println(coalesced.toDebugString) (4) MapPartitionsRDD[9] at coalesce at :75 [] | CoalescedRDD[8] at coalesce at :75 [] | ShuffledRDD[7] at coalesce at :75 [] +-(16) MapPartitionsRDD[6] at coalesce at :75 [] | ParallelCollectionRDD[5] at parallelize at :74 []


=== Demo: ShuffledRDD and sortByKey

[source,plaintext]

val data = sc.parallelize(0 to 9) val grouped = rdd.groupBy(_ % 2) val sorted = grouped.sortByKey(numPartitions = 2) scala> println(sorted.toDebugString) (2) ShuffledRDD[15] at sortByKey at :74 [] +-(4) ShuffledRDD[12] at groupBy at :74 [] +-(4) MapPartitionsRDD[11] at groupBy at :74 [] | MapPartitionsRDD[9] at coalesce at :75 [] | CoalescedRDD[8] at coalesce at :75 [] | ShuffledRDD[7] at coalesce at :75 [] +-(16) MapPartitionsRDD[6] at coalesce at :75 [] | ParallelCollectionRDD[5] at parallelize at :74 []


== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| userSpecifiedSerializer a| [[userSpecifiedSerializer]] User-specified Serializer for the single ShuffleDependency dependency

[source, scala]

userSpecifiedSerializer: Option[Serializer] = None

userSpecifiedSerializer is undefined (None) by default and can be changed using setSerializer method (that is used for PairRDDFunctions.md#combineByKeyWithClassTag[PairRDDFunctions.combineByKeyWithClassTag] transformation).

|===


Last update: 2020-10-09