ShuffleMapStage (shuffle map stage or simply map stage) is a Stage.
ShuffleMapStage corresponds to (and is associated with) a ShuffleDependency.
ShuffleMapStage takes the following to be created:
- Stage ID
- RDD (of the ShuffleDependency)
- Number of tasks
- Parent Stages
- First Job ID (of the ActiveJob that created it)
- Resource Profile ID
ShuffleMapStage is created when:
DAGScheduleris requested to plan a ShuffleDependency for execution
If not available (
MapOutputTrackerMaster does not track the
findMissingPartitions simply assumes that all the partitions are missing.
findMissingPartitions is part of the Stage abstraction.
When "executed", a
ShuffleMapStage saves map output files (for reduce tasks).
When all partitions have shuffle map outputs available,
ShuffleMapStage is considered ready (done or available).
isAvailable is used when:
DAGScheduleris requested to getMissingParentStages, handleMapStageSubmitted, submitMissingTasks, processShuffleMapStageCompletion, markMapStageJobsAsFinished and stageDependsOn
numAvailableOutputs is used when:
_mapStageJobs internal registry of ActiveJobs to track jobs that were submitted to execute the stage independently.
A new job is registered (added) in addActiveJob.
An active job is deregistered (removed) in removeActiveJob.
addActiveJob( job: ActiveJob): Unit
addActiveJob is used when:
DAGScheduleris requested to handleMapStageSubmitted
removeActiveJob( job: ActiveJob): Unit
removeActiveJob is used when:
DAGScheduleris requested to cleanupStateForJobAndIndependentStages
mapStageJobs returns the _mapStageJobs list.
mapStageJobs is used when:
DAGScheduleris requested to markMapStageJobsAsFinished
Demo: ShuffleMapStage Sharing¶
ShuffleMapStage can be shared across multiple jobs (if these jobs reuse the same RDDs).
val keyValuePairs = sc.parallelize(0 to 5).map((_, 1)) val rdd = keyValuePairs.sortByKey() // (1) scala> println(rdd.toDebugString) (6) ShuffledRDD at sortByKey at <console>:39  +-(16) MapPartitionsRDD at map at <console>:39  | ParallelCollectionRDD at parallelize at <console>:39  rdd.count // (2) rdd.count // (3)
- Shuffle at
- Submits a job with two stages (and two to be executed)
- Intentionally repeat the last action that submits a new job with two stages with one being shared as already-computed
Map Output Files¶
ShuffleMapStage writes out map output files (for a shuffle).