ShuffleDependency¶
ShuffleDependency is a Dependency on the output of a ShuffleMapStage of a key-value RDD.
ShuffleDependency uses the RDD to know the number of (map-side/pre-shuffle) partitions and the Partitioner for the number of (reduce-size/post-shuffle) partitions.
ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]
Creating Instance¶
ShuffleDependency takes the following to be created:
- RDD (
RDD[_ <: Product2[K, V]]) - Partitioner
- Serializer (default: SparkEnv.get.serializer)
- Optional Key Ordering (default: undefined)
- Optional Aggregator
- mapSideCombine
- ShuffleWriteProcessor
ShuffleDependency is created when:
CoGroupedRDDis requested for the dependencies (for RDDs with different partitioners)ShuffledRDDis requested for the dependenciesShuffleExchangeExec(Spark SQL) physical operator is requested to prepare aShuffleDependency
When created, ShuffleDependency gets the shuffle id.
ShuffleDependency registers itself with the ShuffleManager and gets a ShuffleHandle (available as shuffleHandle). ShuffleDependency uses SparkEnv to access the ShuffleManager.
In the end, ShuffleDependency registers itself with the ContextCleaner (if configured) and the ShuffleDriverComponents.
Aggregator¶
aggregator: Option[Aggregator[K, V, C]]
ShuffleDependency can be given a map/reduce-side Aggregator when created.
ShuffleDependency asserts (when created) that an Aggregator is defined when the mapSideCombine flag is enabled.
aggregator is used when:
SortShuffleWriteris requested to write records (for mapper tasks)BlockStoreShuffleReaderis requested to read records (for reducer tasks)
Map-Size Partial Aggregation Flag¶
ShuffleDependency uses a mapSideCombine flag that controls whether to perform map-side partial aggregation (map-side combine) using the Aggregator.
mapSideCombine is disabled (false) by default and can be enabled (true) for some uses of ShuffledRDD.
ShuffleDependency requires that the optional Aggregator is actually defined for the flag enabled.
mapSideCombine is used when:
BlockStoreShuffleReaderis requested to read combined records for a reduce taskSortShuffleManageris requested to register a shuffleSortShuffleWriteris requested to write records
Partitioner¶
ShuffleDependency is given a Partitioner (when created).
ShuffleDependency uses the Partitioner to partition the shuffle output.
The Partitioner is used when:
SortShuffleWriteris requested to write records (and create an ExternalSorter)- others (FIXME)
ShuffleWriteProcessor¶
ShuffleDependency can be given a ShuffleWriteProcessor when created.
The ShuffleWriteProcessor is used when:
ShuffleMapTaskis requested to runTask (to write partition records out to the shuffle system)
Shuffle ID¶
shuffleId: Int
ShuffleDependency is identified uniquely by an application-wide shuffle ID (that is requested from SparkContext when created).
ShuffleHandle¶
ShuffleDependency registers itself with the ShuffleManager when created.
The ShuffleHandle is used when:
- CoGroupedRDDs, ShuffledRDD, and
ShuffledRowRDD(Spark SQL) are requested to compute a partition (to get a ShuffleReader for aShuffleDependency) ShuffleMapTaskis requested to run (to get aShuffleWriterfor a ShuffleDependency).