ShuffleDependency is a Dependency on the output of a ShuffleMapStage of a key-value RDD.
ShuffleDependency uses the RDD to know the number of (map-side/pre-shuffle) partitions and the Partitioner for the number of (reduce-size/post-shuffle) partitions.
ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]
ShuffleDependency takes the following to be created:
- RDD (
RDD[_ <: Product2[K, V]])
- Serializer (default: SparkEnv.get.serializer)
- Optional Key Ordering (default: undefined)
- Optional Aggregator
ShuffleDependency is created when:
CoGroupedRDDis requested for the dependencies (for RDDs with different partitioners)
ShuffledRDDis requested for the dependencies
SubtractedRDDis requested for the dependencies (for an RDD with different partitioner)
ShuffleExchangeExec(Spark SQL) physical operator is requested to prepare a
ShuffleDependency gets the shuffle id.
ShuffleDependency registers itself with the ShuffleManager and gets a
ShuffleHandle (available as shuffleHandle).
ShuffleDependency uses SparkEnv to access the ShuffleManager.
In the end,
ShuffleDependency registers itself with the ContextCleaner (if configured) and the ShuffleDriverComponents.
aggregator: Option[Aggregator[K, V, C]]
ShuffleDependency can be given a map/reduce-side Aggregator when created.
ShuffleDependency asserts (when created) that an
Aggregator is defined when the mapSideCombine flag is enabled.
aggregator is used when:
SortShuffleWriteris requested to write records (for mapper tasks)
BlockStoreShuffleReaderis requested to read records (for reducer tasks)
Map-Size Partial Aggregation Flag¶
ShuffleDependency uses a
mapSideCombine flag that controls whether to perform map-side partial aggregation (map-side combine) using the Aggregator.
mapSideCombine is disabled (
false) by default and can be enabled (
true) for some uses of ShuffledRDD.
ShuffleDependency requires that the optional Aggregator is actually defined for the flag enabled.
mapSideCombine is used when:
BlockStoreShuffleReaderis requested to read combined records for a reduce task
SortShuffleManageris requested to register a shuffle
SortShuffleWriteris requested to write records
ShuffleDependency is given a Partitioner (when created).
ShuffleDependency uses the
Partitioner to partition the shuffle output.
Partitioner is used when:
SortShuffleWriteris requested to write records (and create an ExternalSorter)
- others (FIXME)
ShuffleDependency can be given a ShuffleWriteProcessor when created.
ShuffleWriteProcessor is used when:
ShuffleMapTaskis requested to runTask (to write partition records out to the shuffle system)
ShuffleDependency is identified uniquely by an application-wide shuffle ID (that is requested from SparkContext when created).
ShuffleDependency registers itself with the ShuffleManager when created.
ShuffleHandle is used when:
- CoGroupedRDDs, ShuffledRDD, SubtractedRDD, and
ShuffledRowRDD(Spark SQL) are requested to compute a partition (to get a ShuffleReader for a
ShuffleMapTaskis requested to run (to get a
ShuffleWriterfor a ShuffleDependency).