Skip to content


ShuffleMapStage (shuffle map stage or simply map stage) is one of the two types of stages in a physical execution DAG (beside a ResultStage).

NOTE: The logical DAG or logical execution plan is the RDD lineage.

ShuffleMapStage corresponds to (and is associated with) a <>.

ShuffleMapStage is created when DAGScheduler is requested to[plan a ShuffleDependency for execution].

ShuffleMapStage can also be[submitted independently as a Spark job] for[Adaptive Query Planning / Adaptive Scheduling].

ShuffleMapStage is an input for the other following stages in the DAG of stages and is also called a shuffle dependency's map side.

Creating Instance

ShuffleMapStage takes the following to be created:

  • [[id]] Stage ID
  • [[rdd]] RDD of the <>
  • [[numTasks]] Number of tasks
  • [[parents]] Parent[stages]
  • [[firstJobId]] ID of the ActiveJob that created it
  • [[callSite]] CallSite
  • [[shuffleDep]] ShuffleDependency
  • [[mapOutputTrackerMaster]] MapOutputTrackerMaster

== [[_mapStageJobs]][[mapStageJobs]][[addActiveJob]][[removeActiveJob]] Jobs Registry

ShuffleMapStage keeps track of jobs that were submitted to execute it independently (if any).

The registry is used when DAGScheduler is requested to[markMapStageJobsAsFinished] (FIXME: when[DAGScheduler is notified that a ShuffleMapTask has finished successfully] and the task made ShuffleMapStage completed and so marks any map-stage jobs waiting on this stage as finished).

A new job is registered (added) when DAGScheduler is[notified that a ShuffleDependency was submitted for execution (as a MapStageSubmitted event)].

An active job is deregistered (removed) when DAGScheduler is requested to[clean up after a job and independent stages].

== [[isAvailable]][[numAvailableOutputs]] ShuffleMapStage is Available (Fully Computed)

When executed, a ShuffleMapStage saves map output files (for reduce tasks).

When all <> have shuffle map outputs available, ShuffleMapStage is considered available (done or ready).

ShuffleMapStage is asked about its availability when DAGScheduler is requested for[missing parent map stages for a stage],[handleMapStageSubmitted],[submitMissingTasks],[handleTaskCompletion],[markMapStageJobsAsFinished],[stageDependsOn].

ShuffleMapStage uses the <> for the[number of partitions with shuffle map outputs available] (of the <> by the shuffle ID).

== [[findMissingPartitions]] Finding Missing Partitions

[source, scala]

findMissingPartitions(): Seq[Int]

findMissingPartitions requests the <> for the[missing partitions] (of the <> by the shuffle ID) and returns them.

If MapOutputTrackerMaster does not track the ShuffleDependency yet, findMissingPartitions simply returns all the[partitions] as missing.

findMissingPartitions is part of the[Stage] abstraction.

== [[stage-sharing]] ShuffleMapStage Sharing

A ShuffleMapStage can be shared across multiple jobs, if these jobs reuse the same RDDs.

.Skipped Stages are already-computed ShuffleMapStages image::dagscheduler-webui-skipped-stages.png[align="center"]

[source, scala]

val rdd = sc.parallelize(0 to 5).map((_,1)).sortByKey() // <1> rdd.count // <2> rdd.count // <3>

<1> Shuffle at sortByKey() <2> Submits a job with two stages with two being executed <3> Intentionally repeat the last action that submits a new job with two stages with one being shared as already-being-computed

Last update: 2020-10-10