ShuffleMapStage

ShuffleMapStage (shuffle map stage or simply map stage) is one of the two types of stages in a physical execution DAG (beside a ResultStage).

The logical DAG or logical execution plan is the RDD lineage.

ShuffleMapStage corresponds to (and is associated with) a ShuffleDependency.

ShuffleMapStage is created when DAGScheduler is requested to plan a ShuffleDependency for execution.

ShuffleMapStage is an input for the other following stages in the DAG of stages and is also called a shuffle dependency’s map side.

Creating Instance

ShuffleMapStage takes the following to be created:

ShuffleMapStage initializes the internal registries and counters.

Jobs Registry

ShuffleMapStage keeps track of jobs that were submitted to execute it independently (if any).

The registry is used when DAGScheduler is requested to markMapStageJobsAsFinished (FIXME: when DAGScheduler is notified that a ShuffleMapTask has finished successfully and the task made ShuffleMapStage completed and so marks any map-stage jobs waiting on this stage as finished).

An active job is deregistered (removed) when DAGScheduler is requested to clean up after a job and independent stages.

ShuffleMapStage is Available (Fully Computed)

When executed, a ShuffleMapStage saves map output files (for reduce tasks).

When all partitions have shuffle map outputs available, ShuffleMapStage is considered available (done or ready).

ShuffleMapStage is asked about its availability when DAGScheduler is requested for missing parent map stages for a stage, handleMapStageSubmitted, submitMissingTasks, handleTaskCompletion, markMapStageJobsAsFinished, stageDependsOn.

ShuffleMapStage uses the MapOutputTrackerMaster for the number of partitions with shuffle map outputs available (of the ShuffleDependency by the shuffle ID).

Finding Missing Partitions

findMissingPartitions(): Seq[Int]

findMissingPartitions requests the MapOutputTrackerMaster for the missing partitions (of the ShuffleDependency by the shuffle ID) and returns them.

If MapOutputTrackerMaster does not track the ShuffleDependency yet, findMissingPartitions simply returns all the partitions as missing.

findMissingPartitions is part of the Stage abstraction.

ShuffleMapStage Sharing

A ShuffleMapStage can be shared across multiple jobs, if these jobs reuse the same RDDs.

dagscheduler webui skipped stages
Figure 1. Skipped Stages are already-computed ShuffleMapStages
val rdd = sc.parallelize(0 to 5).map((_,1)).sortByKey()  (1)
rdd.count  (2)
rdd.count  (3)
1 Shuffle at sortByKey()
2 Submits a job with two stages with two being executed
3 Intentionally repeat the last action that submits a new job with two stages with one being shared as already-being-computed