ShuffleMapStage¶
ShuffleMapStage (shuffle map stage or simply map stage) is a Stage.
ShuffleMapStage corresponds to (and is associated with) a ShuffleDependency.
ShuffleMapStage can be submitted independently but it is usually an intermediate step in a physical execution plan (with the final step being a ResultStage).
Creating Instance¶
ShuffleMapStage takes the following to be created:
- Stage ID
- RDD (of the ShuffleDependency)
- Number of tasks
- Parent Stages
- First Job ID (of the ActiveJob that created it)
-
CallSite - ShuffleDependency
- MapOutputTrackerMaster
- Resource Profile ID
ShuffleMapStage is created when:
DAGScheduleris requested to plan a ShuffleDependency for execution
Missing Partitions¶
findMissingPartitions(): Seq[Int]
findMissingPartitions requests the MapOutputTrackerMaster for the missing partitions (of the ShuffleDependency) and returns them.
If not available (MapOutputTrackerMaster does not track the ShuffleDependency), findMissingPartitions simply assumes that all the partitions are missing.
findMissingPartitions is part of the Stage abstraction.
ShuffleMapStage Ready¶
When "executed", a ShuffleMapStage saves map output files (for reduce tasks).
When all partitions have shuffle map outputs available, ShuffleMapStage is considered ready (done or available).
isAvailable¶
isAvailable: Boolean
isAvailable is true when the ShuffleMapStage is ready and all partitions have shuffle outputs (i.e. the numAvailableOutputs is exactly the numPartitions).
isAvailable is used when:
DAGScheduleris requested to getMissingParentStages, handleMapStageSubmitted, submitMissingTasks, processShuffleMapStageCompletion, markMapStageJobsAsFinished and stageDependsOn
Available Outputs¶
numAvailableOutputs: Int
numAvailableOutputs requests the MapOutputTrackerMaster to getNumAvailableOutputs (for the shuffleId of the ShuffleDependency).
numAvailableOutputs is used when:
DAGScheduleris requested to submitMissingTasksShuffleMapStageis requested to isAvailable
Active Jobs¶
ShuffleMapStage defines _mapStageJobs internal registry of ActiveJobs to track jobs that were submitted to execute the stage independently.
A new job is registered (added) in addActiveJob.
An active job is deregistered (removed) in removeActiveJob.
addActiveJob¶
addActiveJob(
job: ActiveJob): Unit
addActiveJob adds the given ActiveJob to (the front of) the _mapStageJobs list.
addActiveJob is used when:
DAGScheduleris requested to handleMapStageSubmitted
removeActiveJob¶
removeActiveJob(
job: ActiveJob): Unit
removeActiveJob removes the ActiveJob from the _mapStageJobs registry.
removeActiveJob is used when:
DAGScheduleris requested to cleanupStateForJobAndIndependentStages
mapStageJobs¶
mapStageJobs: Seq[ActiveJob]
mapStageJobs returns the _mapStageJobs list.
mapStageJobs is used when:
DAGScheduleris requested to markMapStageJobsAsFinished
Demo: ShuffleMapStage Sharing¶
A ShuffleMapStage can be shared across multiple jobs (if these jobs reuse the same RDDs).

val keyValuePairs = sc.parallelize(0 to 5).map((_, 1))
val rdd = keyValuePairs.sortByKey() // (1)
scala> println(rdd.toDebugString)
(6) ShuffledRDD[4] at sortByKey at <console>:39 []
+-(16) MapPartitionsRDD[1] at map at <console>:39 []
| ParallelCollectionRDD[0] at parallelize at <console>:39 []
rdd.count // (2)
rdd.count // (3)
- Shuffle at
sortByKey() - Submits a job with two stages (and two to be executed)
- Intentionally repeat the last action that submits a new job with two stages with one being shared as already-computed
Map Output Files¶
ShuffleMapStage writes out map output files (for a shuffle).