ShuffleDependency — Shuffle Dependency

ShuffleDependency is a RDD Dependency on the output of a ShuffleMapStage for a key-value pair RDD.

ShuffleDependency uses the RDD to know the number of (map-side/pre-shuffle) partitions and the Partitioner for the number of (reduce-size/post-shuffle) partitions.

ShuffleDependency is a dependency of ShuffledRDD as well as CoGroupedRDD and SubtractedRDD but only when partitioners (of the RDD’s and after transformations) are different.

A ShuffleDependency is created for a key-value pair RDD, i.e. RDD[Product2[K, V]] with K and V being the types of keys and values, respectively.

Use dependencies method on an RDD to know the dependencies.
scala> val rdd = sc.parallelize(0 to 8).groupBy(_ % 3)
rdd: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:24

scala> rdd.dependencies
res0: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@454f6cc5)

Every ShuffleDependency has a unique application-wide shuffleId number that is assigned when ShuffleDependency is created (and is used throughout Spark’s code to reference a ShuffleDependency).

Shuffle ids are tracked by SparkContext.

keyOrdering Property

FIXME

serializer Property

FIXME

Creating ShuffleDependency Instance

ShuffleDependency takes the following when created:

  1. A single key-value pair RDD, i.e. RDD[Product2[K, V]],

  2. Partitioner (available as partitioner property),

  3. Serializer,

  4. Optional key ordering (of Scala’s scala.math.Ordering type),

  5. Optional Aggregator,

  6. mapSideCombine flag which is disabled (i.e. false) by default.

When created, ShuffleDependency gets shuffle id (as shuffleId).

ShuffleDependency uses the input RDD to access SparkContext and so the shuffleId.

ShuffleDependency registers itself with ShuffleManager and gets a ShuffleHandle (available as shuffleHandle property).

ShuffleDependency accesses ShuffleManager using SparkEnv.

In the end, ShuffleDependency registers itself for cleanup with ContextCleaner.

ShuffleDependency accesses the optional ContextCleaner through SparkContext.
ShuffleDependency is created when ShuffledRDD, CoGroupedRDD, and SubtractedRDD return their RDD dependencies.

partitioner Property

partitioner property is a Partitioner that is used to partition the shuffle output.

partitioner is specified when ShuffleDependency is created.

shuffleHandle Property

shuffleHandle: ShuffleHandle

shuffleHandle is the ShuffleHandle of a ShuffleDependency as assigned eagerly when ShuffleDependency was created.

shuffleHandle is used to compute CoGroupedRDDs, ShuffledRDD, SubtractedRDD, and ShuffledRowRDD (to get a ShuffleReader for a ShuffleDependency) and when a ShuffleMapTask runs (to get a ShuffleWriter for a ShuffleDependency).

Map-Size Combine Flag — mapSideCombine Attribute

mapSideCombine is a flag to control whether to use partial aggregation (aka map-side combine).

mapSideCombine is by default disabled (i.e. false) when creating a ShuffleDependency.

When enabled, SortShuffleWriter and BlockStoreShuffleReader assume that an Aggregator is also defined.

mapSideCombine is exclusively set (and hence can be enabled) when ShuffledRDD returns the dependencies (which is a single ShuffleDependency).

aggregator Property

aggregator: Option[Aggregator[K, V, C]] = None

aggregator is a map/reduce-side Aggregator (for a RDD’s shuffle).

aggregator is by default undefined (i.e. None) when ShuffleDependency is created.

Usage

The places where ShuffleDependency is used:

The RDD operations that may or may not use the above RDDs and hence shuffling:

  • coalesce

  • cogroup

    • intersection

  • subtractByKey

    • subtract

  • sortByKey

    • sortBy

  • repartitionAndSortWithinPartitions

  • combineByKeyWithClassTag

    • combineByKey

    • aggregateByKey

    • foldByKey

    • reduceByKey

    • countApproxDistinctByKey

    • groupByKey

  • partitionBy

There may be other dependent methods that use the above.