A stage is a physical unit of execution. It is a step in a physical execution plan.
A stage is a set of parallel tasks — one task per partition (of an RDD that computes partial results of a function executed as part of a Spark job).
In other words, a Spark job is a computation with that computation sliced into stages.
A stage can only work on the partitions of a single RDD (identified by
rdd), but can be associated with many other dependent parent stages (via internal field
parents), with the boundary of a stage marked by shuffle dependencies.
Submitting a stage can therefore trigger execution of a series of dependent parent stages (refer to RDDs, Job Execution, Stages, and Partitions).
Finally, every stage has a
firstJobId that is the id of the job that submitted the stage.
There are two types of stages:
ShuffleMapStage is an intermediate stage (in the execution DAG) that produces data for other stage(s). It writes map output files for a shuffle. It can also be the final stage in a job in Adaptive Query Planning / Adaptive Scheduling.
When a job is submitted, a new stage is created with the parent ShuffleMapStage linked — they can be created from scratch or linked to, i.e. shared, if other jobs use them already.
A stage tracks the jobs (their ids) it belongs to (using the internal
DAGScheduler splits up a job into a collection of stages. Each stage contains a sequence of narrow transformations that can be completed without shuffling the entire data set, separated at shuffle boundaries, i.e. where shuffle occurs. Stages are thus a result of breaking the RDD graph at shuffle boundaries.
Shuffle boundaries introduce a barrier where stages/tasks must wait for the previous stage to finish before they fetch map outputs.
RDD operations with narrow dependencies, like
filter(), are pipelined together into one set of tasks in each stage, but operations with shuffle dependencies require multiple stages, i.e. one to write a set of map output files, and another to read those files after a barrier.
In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the
RDD.compute() functions of various RDDs, e.g.
Partitions are computed in jobs, and result stages may not always need to compute all partitions in their target RDD, e.g. for actions like
DAGScheduler prints the following INFO message when there are tasks to submit:
Submitting 1 missing tasks from ResultStage 36 (ShuffledRDD at reduceByKey at <console>:24)
There is also the following DEBUG message with pending partitions:
New pending partitions: Set(0)
Tasks are later submitted to Task Scheduler (via
When no tasks in a stage can be submitted, the following DEBUG message shows in the logs:
Stage.failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean checks whether the number of fetch failed attempts (using
fetchFailedAttemptIds) exceeds the number of consecutive failures allowed for a given stage (that should then be aborted)
|The number of consecutive failures for a stage is not configurable.|
latestInfo simply returns the most recent
StageInfo (i.e. makes it accessible).
makeNewStageAttempt( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit
makeNewStageAttempt creates a new
TaskMetrics and registers the internal accumulators (using the RDD’s
makeNewStageAttempt increments nextAttemptId counter.
makeNewStageAttempt is used when
DAGScheduler is requested to submit the missing tasks of a stage.
Long description of the stage
Set of jobs the stage belongs to.
Name of the stage
The ID for the next attempt of the stage.
Number of partitions
Set of pending partitions
Internal cache with…FIXME