Skip to content

ShuffleMapStage

ShuffleMapStage (shuffle map stage or simply map stage) is a Stage.

ShuffleMapStage corresponds to (and is associated with) a ShuffleDependency.

ShuffleMapStage can be submitted independently but it is usually an intermediate step in a physical execution plan (with the final step being a ResultStage).

Creating Instance

ShuffleMapStage takes the following to be created:

ShuffleMapStage is created when:

Missing Partitions

findMissingPartitions(): Seq[Int]

findMissingPartitions requests the MapOutputTrackerMaster for the missing partitions (of the ShuffleDependency) and returns them.

If not available (MapOutputTrackerMaster does not track the ShuffleDependency), findMissingPartitions simply assumes that all the partitions are missing.

findMissingPartitions is part of the Stage abstraction.

ShuffleMapStage Ready

When "executed", a ShuffleMapStage saves map output files (for reduce tasks).

When all partitions have shuffle map outputs available, ShuffleMapStage is considered ready (done or available).

isAvailable

isAvailable: Boolean

isAvailable is true when the ShuffleMapStage is ready and all partitions have shuffle outputs (i.e. the numAvailableOutputs is exactly the numPartitions).

isAvailable is used when:

Available Outputs

numAvailableOutputs: Int

numAvailableOutputs requests the MapOutputTrackerMaster to getNumAvailableOutputs (for the shuffleId of the ShuffleDependency).

numAvailableOutputs is used when:

Active Jobs

ShuffleMapStage defines _mapStageJobs internal registry of ActiveJobs to track jobs that were submitted to execute the stage independently.

A new job is registered (added) in addActiveJob.

An active job is deregistered (removed) in removeActiveJob.

addActiveJob

addActiveJob(
  job: ActiveJob): Unit

addActiveJob adds the given ActiveJob to (the front of) the _mapStageJobs list.

addActiveJob is used when:

removeActiveJob

removeActiveJob(
  job: ActiveJob): Unit

removeActiveJob removes the ActiveJob from the _mapStageJobs registry.

removeActiveJob is used when:

mapStageJobs

mapStageJobs: Seq[ActiveJob]

mapStageJobs returns the _mapStageJobs list.

mapStageJobs is used when:

Demo: ShuffleMapStage Sharing

A ShuffleMapStage can be shared across multiple jobs (if these jobs reuse the same RDDs).

Skipped Stages are already-computed ShuffleMapStages

val keyValuePairs = sc.parallelize(0 to 5).map((_, 1))
val rdd = keyValuePairs.sortByKey()  // (1)

scala> println(rdd.toDebugString)
(6) ShuffledRDD[4] at sortByKey at <console>:39 []
 +-(16) MapPartitionsRDD[1] at map at <console>:39 []
    |   ParallelCollectionRDD[0] at parallelize at <console>:39 []

rdd.count  // (2)
rdd.count  // (3)
  1. Shuffle at sortByKey()
  2. Submits a job with two stages (and two to be executed)
  3. Intentionally repeat the last action that submits a new job with two stages with one being shared as already-computed

Map Output Files

ShuffleMapStage writes out map output files (for a shuffle).