ShuffleMapStage¶
ShuffleMapStage
(shuffle map stage or simply map stage) is a Stage.
ShuffleMapStage
corresponds to (and is associated with) a ShuffleDependency.
ShuffleMapStage
can be submitted independently but it is usually an intermediate step in a physical execution plan (with the final step being a ResultStage).
Creating Instance¶
ShuffleMapStage
takes the following to be created:
- Stage ID
- RDD (of the ShuffleDependency)
- Number of tasks
- Parent Stages
- First Job ID (of the ActiveJob that created it)
-
CallSite
- ShuffleDependency
- MapOutputTrackerMaster
- Resource Profile ID
ShuffleMapStage
is created when:
DAGScheduler
is requested to plan a ShuffleDependency for execution
Missing Partitions¶
findMissingPartitions(): Seq[Int]
findMissingPartitions
requests the MapOutputTrackerMaster for the missing partitions (of the ShuffleDependency) and returns them.
If not available (MapOutputTrackerMaster
does not track the ShuffleDependency
), findMissingPartitions
simply assumes that all the partitions are missing.
findMissingPartitions
is part of the Stage abstraction.
ShuffleMapStage Ready¶
When "executed", a ShuffleMapStage
saves map output files (for reduce tasks).
When all partitions have shuffle map outputs available, ShuffleMapStage
is considered ready (done or available).
isAvailable¶
isAvailable: Boolean
isAvailable
is true
when the ShuffleMapStage
is ready and all partitions have shuffle outputs (i.e. the numAvailableOutputs is exactly the numPartitions).
isAvailable
is used when:
DAGScheduler
is requested to getMissingParentStages, handleMapStageSubmitted, submitMissingTasks, processShuffleMapStageCompletion, markMapStageJobsAsFinished and stageDependsOn
Available Outputs¶
numAvailableOutputs: Int
numAvailableOutputs
requests the MapOutputTrackerMaster to getNumAvailableOutputs (for the shuffleId of the ShuffleDependency).
numAvailableOutputs
is used when:
DAGScheduler
is requested to submitMissingTasksShuffleMapStage
is requested to isAvailable
Active Jobs¶
ShuffleMapStage
defines _mapStageJobs
internal registry of ActiveJobs to track jobs that were submitted to execute the stage independently.
A new job is registered (added) in addActiveJob.
An active job is deregistered (removed) in removeActiveJob.
addActiveJob¶
addActiveJob(
job: ActiveJob): Unit
addActiveJob
adds the given ActiveJob to (the front of) the _mapStageJobs list.
addActiveJob
is used when:
DAGScheduler
is requested to handleMapStageSubmitted
removeActiveJob¶
removeActiveJob(
job: ActiveJob): Unit
removeActiveJob
removes the ActiveJob from the _mapStageJobs registry.
removeActiveJob
is used when:
DAGScheduler
is requested to cleanupStateForJobAndIndependentStages
mapStageJobs¶
mapStageJobs: Seq[ActiveJob]
mapStageJobs
returns the _mapStageJobs list.
mapStageJobs
is used when:
DAGScheduler
is requested to markMapStageJobsAsFinished
Demo: ShuffleMapStage Sharing¶
A ShuffleMapStage
can be shared across multiple jobs (if these jobs reuse the same RDDs).
val keyValuePairs = sc.parallelize(0 to 5).map((_, 1))
val rdd = keyValuePairs.sortByKey() // (1)
scala> println(rdd.toDebugString)
(6) ShuffledRDD[4] at sortByKey at <console>:39 []
+-(16) MapPartitionsRDD[1] at map at <console>:39 []
| ParallelCollectionRDD[0] at parallelize at <console>:39 []
rdd.count // (2)
rdd.count // (3)
- Shuffle at
sortByKey()
- Submits a job with two stages (and two to be executed)
- Intentionally repeat the last action that submits a new job with two stages with one being shared as already-computed
Map Output Files¶
ShuffleMapStage
writes out map output files (for a shuffle).