The introduction that follows was highly influenced by the scaladoc of org.apache.spark.scheduler.DAGScheduler. As DAGScheduler is a private class it does not appear in the official API documentation. You are strongly encouraged to read the sources and only then read this and the related pages afterwards.
"Reading the sources", I say?! Yes, I am kidding!
DAGScheduler is the scheduling layer of Apache Spark that implements stage-oriented scheduling. It transforms a logical execution plan (i.e. RDD lineage of dependencies built using RDD transformations) to a physical execution plan (using stages).
DAGSchedulerTransforming RDD Lineage Into Stage DAG
After an action has been called, SparkContext hands over a logical plan to
DAGScheduler that it in turn translates to a set of stages that are submitted as TaskSets for execution (see Execution Model).
DAGScheduler does three things in Spark (thorough explanations follow):
Computes an execution DAG, i.e. DAG of stages, for a job.
Determines the preferred locations to run each task on.
Handles failures due to shuffle output files being lost.
DAGScheduler computes a directed acyclic graph (DAG) of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run jobs. It then submits stages to TaskScheduler.
In addition to coming up with the execution DAG, DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes the information to TaskScheduler.
DAGScheduler tracks which RDDs are cached (or persisted) to avoid "recomputing" them, i.e. redoing the map side of a shuffle.
DAGScheduler remembers what ShuffleMapStages have already produced output files (that are stored in BlockManagers).
DAGScheduler is only interested in cache location coordinates, i.e. host and executor id, per partition of a RDD.
|FIXME: A diagram, please|
Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures within a stage that are not caused by shuffle file loss are handled by the TaskScheduler itself, which will retry each task a small number of times before cancelling the whole stage.
DAGScheduler uses an event queue architecture in which a thread can post
DAGSchedulerEvent events, e.g. a new job or stage being submitted, that DAGScheduler reads and executes sequentially. See the section Internal Event Loop - dag-scheduler-event-loop.
DAGScheduler runs stages in topological order.
Block locations per RDD and partition.
Uses TaskLocation that includes a host name and an executor id on that host (as
The keys are RDDs (their ids) and the values are arrays indexed by partition numbers.
Each entry is a set of block locations where a RDD partition is cached, i.e. the BlockManagers of the blocks.
Initialized empty when
The lookup table of lost executors and the epoch of the event.
Stages that failed due to fetch failures (when a task fails with
The lookup table of
The lookup table of all stages per
The next job id counting from
The next stage id counting from
The set of stages that are currently "running".
A stage is added when submitMissingTasks gets executed (without first checking if the stage has not already been added).
The lookup table for stages per their ids.
The stages with parents to be computed
Add the following line to
Refer to Logging.
DAGScheduler uses SparkContext, TaskScheduler, LiveListenerBus, MapOutputTracker and BlockManager for its services. However, at the very minimum,
DAGScheduler takes a
SparkContext only (and requests
SparkContext for the other services).
DAGScheduler reports metrics about its execution (refer to the section Metrics).
createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage
DAGScheduler takes the following when created:
DAGScheduler initializes the internal registries and counters.
executorHeartbeatReceived( execId: String, accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])], blockManagerId: BlockManagerId): Boolean
cleanupStateForJobAndIndependentStages(job: ActiveJob): Unit
cleanupStateForJobAndIndependentStages cleans up the state for
job and any stages that are not part of any other job.
cleanupStateForJobAndIndependentStages looks the
job up in the internal jobIdToStageIds registry.
If no stages are found, the following ERROR is printed out to the logs:
ERROR No stages registered for job [jobId]
cleanupStateForJobAndIndependentStages uses stageIdToStage registry to find the stages (the real objects not ids!).
For each stage,
cleanupStateForJobAndIndependentStages reads the jobs the stage belongs to.
job does not belong to the jobs of the stage, the following ERROR is printed out to the logs:
ERROR Job [jobId] not registered for stage [stageId] even though that stage was registered for the job
While removing from runningStages, you should see the following DEBUG message in the logs:
DEBUG Removing running stage [stageId]
While removing from waitingStages, you should see the following DEBUG message in the logs:
DEBUG Removing stage [stageId] from waiting set.
While removing from failedStages, you should see the following DEBUG message in the logs:
DEBUG Removing stage [stageId] from failed set.
After all cleaning (using stageIdToStage as the source registry), if the stage belonged to the one and only
job, you should see the following DEBUG message in the logs:
DEBUG After removal of stage [stageId], remaining stages = [stageIdToStage.size]
markMapStageJobAsFinished(job: ActiveJob, stats: MapOutputStatistics): Unit
markMapStageJobAsFinished marks the active
job finished and notifies Spark listeners.
markMapStageJobAsFinished marks the zeroth partition finished and increases the number of tasks finished in
submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U]
submitJob does the following:
You may see a
IllegalArgumentException thrown when the input
partitions references partitions not in the input
Attempting to access a non-existent partition: [p]. Total number of partitions: [maxPartitions]
submitMapStage[K, V, C]( dependency: ShuffleDependency[K, V, C], callback: MapOutputStatistics => Unit, callSite: CallSite, properties: Properties): JobWaiter[MapOutputStatistics]
nextJobId internal counter to get the job id.
submitMapStage then creates a JobWaiter (with the job id and with one artificial task that will however get completed only when the entire stage finishes).
submitMapStage returns the
If the number of partition to compute is
submitMapStage throws a
Can't run submitMapStage on RDD with 0 partitions
Relaying Stage Cancellation From SparkContext (by Posting StageCancelled to DAGScheduler Event Bus) —
Relaying Job Group Cancellation From SparkContext (by Posting JobGroupCancelled to DAGScheduler Event Bus) —
cancelJobGroup(groupId: String): Unit
INFO Asked to cancel job group [groupId]
Relaying All Jobs Cancellation From SparkContext (by Posting AllJobsCancelled to DAGScheduler Event Bus) —
Relaying Task Started From TaskSetManager (by Posting BeginEvent to DAGScheduler Event Bus) —
taskStarted(task: Task[_], taskInfo: TaskInfo)
Relaying Task Fetching/Getting Result From TaskSetManager (by Posting GettingResultEvent to DAGScheduler Event Bus) —
Relaying Task End From TaskSetManager (by Posting CompletionEvent to DAGScheduler Event Bus) —
taskEnded( task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any], taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
Relaying TaskSet Failed From TaskSetManager (by Posting TaskSetFailed to DAGScheduler Event Bus) —
taskSetFailed( taskSet: TaskSet, reason: String, exception: Option[Throwable]): Unit
The input arguments of
Relaying Executor Lost From TaskSchedulerImpl (by Posting ExecutorLost to DAGScheduler Event Bus) —
executorLost(execId: String, reason: ExecutorLossReason): Unit
Relaying Executor Added From TaskSchedulerImpl (by Posting ExecutorAdded to DAGScheduler Event Bus) —
executorAdded(execId: String, host: String): Unit
Relaying Job Cancellation From SparkContext or JobWaiter (by Posting JobCancelled to DAGScheduler Event Bus) —
cancelJob(jobId: Int): Unit
INFO DAGScheduler: Asked to cancel job [id]
Finding Or Creating Missing Direct Parent ShuffleMapStages (For ShuffleDependencies of Input RDD) —
getOrCreateParentStages Internal Method
getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage]
markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit
runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit
runJob submits an action job to the
DAGScheduler and waits for a result.
When the job succeeds, you should see the following INFO message in the logs:
INFO Job [jobId] finished: [callSite], took [time] s
When the job fails, you should see the following INFO message in the logs and the exception (that led to the failure) is thrown.
INFO Job [jobId] failed: [callSite], took [time] s
Finding or Creating New ShuffleMapStages for ShuffleDependency —
getOrCreateShuffleMapStage Internal Method
getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage
getOrCreateShuffleMapStage finds the
shuffleIdToMapStage internal registry and returns one when found.
All the new
Creating ShuffleMapStage for ShuffleDependency (Copying Shuffle Map Output Locations From Previous Jobs) —
createShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage
createShuffleMapStage creates a ShuffleMapStage for the input ShuffleDependency and
jobId (of a ActiveJob) possibly copying shuffle map output locations from previous jobs to avoid recomputing records.
When a ShuffleMapStage is created, the
createShuffleMapStage first finds or creates missing parent
ShuffleMapStage stages of the associated RDD.
ShuffleDependency is associated with exactly one
The RDD of the new
createShuffleMapStage calls updateJobIdStageIdMaps.
MapOutputTrackerMaster tracks the input
ShuffleDependency (because other jobs have already computed it),
createShuffleMapStage requests the serialized
ShuffleMapStage outputs, deserializes them and registers with the new
MapOutputTrackerMasterWhether Shuffle Map Output Is Already Tracked
MapOutputTrackerMaster does not track the input
ShuffleDependency, you should see the following INFO message in the logs and
createShuffleMapStage registers the
INFO Registering RDD [id] ([creationSite])
createShuffleMapStage returns the new
clearCacheLocs clears the internal registry of the partition locations per RDD.
getMissingAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]]
A missing shuffle dependency of a RDD is a dependency not registered in
getMissingAncestorShuffleDependencies finds direct parent shuffle dependencies of the input RDD and collects the ones that are not registered in
shuffleIdToMapStage internal registry. It repeats the process for the RDDs of the parent shuffle dependencies.
getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]]
getShuffleDependencies takes the direct shuffle dependencies of the input RDD and direct shuffle dependencies of all the parent non-
ShuffleDependencies in the dependency chain (aka RDD lineage).
failJobAndIndependentStages( job: ActiveJob, failureReason: String, exception: Option[Throwable] = None): Unit
failJobAndIndependentStages method fails the input
job and all the stages that are only used by the job.
jobIdToStageIds internal registry to look up the stages registered for the job.
If no stages could be found, you should see the following ERROR message in the logs:
ERROR No stages registered for job [id]
Otherwise, for every stage,
failJobAndIndependentStages finds the job ids the stage belongs to.
If no stages could be found or the job is not referenced by the stages, you should see the following ERROR message in the logs:
ERROR Job [id] not registered for stage [id] even though that stage was registered for the job
Only when there is exactly one job registered for the stage and the stage is in RUNNING state (in
runningStages internal registry),
TaskScheduler is requested to cancel the stage’s tasks and marks the stage finished.
abortStage( failedStage: Stage, reason: String, exception: Option[Throwable]): Unit
abortStage is an internal method that finds all the active jobs that depend on the
failedStage stage and fails them.
abortStage looks the
failedStage stage up in the internal stageIdToStage registry and exits if there the stage was not registered earlier.
At this time, the
completionTime property (of the failed stage’s StageInfo) is assigned to the current time (millis).
All the active jobs that depend on the failed stage (as calculated above) and the stages that do not belong to other jobs (aka independent stages) are failed (with the failure reason being "Job aborted due to stage failure: [reason]" and the input
If there are no jobs depending on the failed stage, you should see the following INFO message in the logs:
INFO Ignoring failure of [failedStage] because all jobs depending on it are done
stageDependsOn(stage: Stage, target: Stage): Boolean
stageDependsOn compares two stages and returns whether the
stage depends on
target stage (i.e.
true) or not (i.e.
stageDependsOn walks through the graph of RDDs of the input
stage. For every RDD in the RDD’s dependencies (using
stageDependsOn adds the RDD of a NarrowDependency to a stack of RDDs to visit while for a ShuffleDependency it finds
ShuffleMapStage stages for a
ShuffleDependency for the dependency and the
stage's first job id that it later adds to a stack of RDDs to visit if the map stage is ready, i.e. all the partitions have shuffle outputs.
After all the RDDs of the input
stage are visited,
stageDependsOn checks if the
target's RDD is among the RDDs of the
stage, i.e. whether the
stage depends on
eventProcessLoop is DAGScheduler’s event bus to which Spark (by submitJob) posts jobs to schedule their execution. Later on, TaskSetManager talks back to
DAGScheduler to inform about the status of the tasks using the same "communication channel".
It allows Spark to release the current thread when posting happens and let the event loop handle events on a separate thread - asynchronously.
submitWaitingChildStages(parent: Stage): Unit
submitWaitingChildStages submits for execution all waiting stages for which the input
parent Stage is the direct parent.
Waiting stages are the stages registered in
When executed, you should see the following
TRACE messages in the logs:
TRACE DAGScheduler: Checking if any dependencies of [parent] are now runnable TRACE DAGScheduler: running: [runningStages] TRACE DAGScheduler: waiting: [waitingStages] TRACE DAGScheduler: failed: [failedStages]
submitWaitingChildStages finds child stages of the input
parent stage, removes them from
waitingStages internal registry, and submits one by one sorted by their job ids.
submitStage(stage: Stage): Unit
submitStage is an internal method that
DAGScheduler uses to submit the input
stage or its missing parents (if there any stages not computed yet before the input
submitStage recursively submits any missing parents of the
submitStage first finds the earliest-created job id that needs the
A stage itself tracks the jobs (their ids) it belongs to (using the internal
The following steps depend on whether there is a job or not.
If there are no jobs that require the
submitStage aborts it with the reason:
No active job for stage [id]
If however there is a job for the
stage, you should see the following DEBUG message in the logs:
DEBUG DAGScheduler: submitStage([stage])
stage ready for submission,
submitStage calculates the list of missing parent stages of the
stage (sorted by their job ids). You should see the following DEBUG message in the logs:
DEBUG DAGScheduler: missing: [missing]
stage has no parent stages missing, you should see the following INFO message in the logs:
INFO DAGScheduler: Submitting [stage] ([stage.rdd]), which has no missing parents
submitStage submits the
stage (with the earliest-created job id) and finishes.
handleJobSubmitted( jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties): Unit
handleMapStageSubmitted( jobId: Int, dependency: ShuffleDependency[_, _, _], callSite: CallSite, listener: JobListener, properties: Properties): Unit
handleTaskCompletion(event: CompletionEvent): Unit
A single stage can be re-executed in multiple attempts due to fault recovery. The number of attempts is configured (FIXME).
TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the
DAGScheduler resubmits the lost stage. This is detected through a
FetchFailed, or an ExecutorLost event.
DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit
TaskSets for any lost stage(s) that compute the missing tasks.
Please note that tasks from the old attempts of a stage could still be running.
A stage object tracks multiple StageInfo objects to pass to Spark listeners or the web UI.
StageInfo for the most recent attempt for a stage is accessible through
See SPARK-9850 Adaptive execution in Spark for the design document. The work is currently in progress.
DAGScheduler.submitMapStage method is used for adaptive query planning, to run map stages and look at statistics about their outputs before submitting downstream stages.
DAGScheduler uses the following ScheduledThreadPoolExecutors (with the policy of removing cancelled tasks from a work queue at time of cancellation):
They are created using
ThreadUtils.newDaemonSingleThreadScheduledExecutor method that uses Guava DSL to instantiate a ThreadFactory.
getMissingParentStages(stage: Stage): List[Stage]
getMissingParentStages starts with the
stage's RDD and walks up the tree of all parent RDDs to find uncached partitions.
An uncached partition of a RDD is a partition that has
getMissingParentStages simply marks the corresponding RDD to visit and moves on to a next dependency of a RDD or works on another unvisited parent RDD.
|NarrowDependency is a RDD dependency that allows for pipelined execution.|
getMissingParentStages focuses on
|ShuffleDependency is a RDD dependency that represents a dependency on the output of a ShuffleMapStage, i.e. shuffle map stage.|
ShuffleMapStage stages. If the
ShuffleMapStage is not available, it is added to the set of missing (map) stages.
|FIXME…IMAGE with ShuffleDependencies queried|
submitMissingTasks(stage: Stage, jobId: Int): Unit
When executed, you should see the following DEBUG message in the logs:
DEBUG DAGScheduler: submitMissingTasks([stage])
pendingPartitions internal field is cleared (it is later filled out with the partitions to run tasks for).
submitMissingTasks requests the
stage for missing partitions, i.e. the indices of the partitions to compute.
submitMissingTasks marks the
stage as running (i.e. adds it to runningStages internal registry).
OutputCommitCoordinator that the stage is started.
For the missing partitions,
submitMissingTasks computes their task locality preferences, i.e. pairs of missing partition ids and their task locality information.
NOTE: The locality information of a RDD is called preferred locations.
In case of non-fatal exceptions at this time (while getting the locality information),
submitMissingTasks creates a new stage attempt.
|A stage attempt is an internal property of a stage.|
submitMissingTasks then aborts the stage (with the reason being "Task creation failed" followed by the exception).
stage is removed from the internal
runningStages collection of stages and
When no exception was thrown (while computing the locality information for tasks),
submitMissingTasks creates a new stage attempt and announces it on LiveListenerBus by posting a SparkListenerStageSubmitted message.
Yes, that is correct. Whether there was a task submission failure or not,
At that time,
submitMissingTasks serializes the RDD (of the stage for which tasks are submitted for) and, depending on the type of the stage, the
ShuffleMapStage) or the function (for
The serialized so-called task binary bytes are "wrapped" as a broadcast variable (to make it available for executors to execute later on).
|That exact moment should make clear how important broadcast variables are for Spark itself that you, a Spark developer, can use, too, to distribute data across the nodes in a Spark application in a very efficient way.|
NotSerializableException exceptions lead to aborting the stage (with the reason being "Task not serializable: [exception]") and removing the stage from the internal
runningStages collection of stages.
Any non-fatal exceptions lead to aborting the stage (with the reason being "Task serialization failed" followed by the exception) and removing the stage from the internal
runningStages collection of stages.
With no exceptions along the way,
submitMissingTasks computes a collection of tasks to execute for the missing partitions (of the
submitMissingTasks creates a ShuffleMapTask or ResultTask for every missing partition of the
stage being ShuffleMapStage or ResultStage, respectively.
submitMissingTasks uses the preferred locations (computed earlier) per partition.
|FIXME Image with creating tasks for partitions in the stage.|
Any non-fatal exceptions lead to aborting the stage (with the reason being "Task creation failed" followed by the exception) and removing the stage from the internal
runningStages collection of stages.
If there are tasks to submit for execution (i.e. there are missing partitions in the stage), you should see the following INFO message in the logs:
INFO DAGScheduler: Submitting [size] missing tasks from [stage] ([rdd])
submitMissingTasks records the partitions (of the tasks) in the
You should see the following DEBUG message in the logs:
DEBUG DAGScheduler: New pending partitions: [pendingPartitions]
submitMissingTasks submits the tasks to
TaskScheduler for execution (with the id of the
stage, attempt id, the input
jobId, and the properties of the
FIXME What are the
submitMissingTasks records the submission time in the stage’s
StageInfo and exits.
If however there are no tasks to submit for execution,
submitMissingTasks marks the stage as finished (with no
You should see a DEBUG message that varies per the type of the input
stage which are:
DEBUG DAGScheduler: Stage [stage] is actually done; (available: [isAvailable],available outputs: [numAvailableOutputs],partitions: [numPartitions])
DEBUG DAGScheduler: Stage [stage] is actually done; (partitions: [numPartitions])
In the end, with no tasks to submit for execution,
submitMissingTasks submits waiting child stages for execution and exits.
getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation]
getPreferredLocs is simply an alias for the internal (recursive) getPreferredLocsInternal.
Finding BlockManagers (Executors) for Cached RDD Partitions (aka Block Location Discovery) —
getCacheLocs Internal Method
getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]]
The size of the collection from
The size of every TaskLocation collection (i.e. every entry in the result of
rdd in the cacheLocs internal registry (of partition locations per RDD).
NONE storage level (i.e. no caching), the result is an empty locations (i.e. no location preference).
For other non-
NONE storage levels,
BlockManagerMaster for block locations that are then mapped to TaskLocations with the hostname of the owning
BlockManager for a block (of a partition) and the executor id.
Finding Placement Preferences for RDD Partition (recursively) —
getPreferredLocsInternal Internal Method
getPreferredLocsInternal( rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation]
getPreferredLocsInternal first finds the
TaskLocations for the
partition of the
rdd (using cacheLocs internal cache) and returns them.
Otherwise, if not found,
rdd for the preferred locations of
partition and returns them.
|Preferred locations of the partitions of a RDD are also called placement preferences or locality preferences.|
If all the attempts fail to yield any non-empty result,
getPreferredLocsInternal returns an empty collection of TaskLocations.
updateAccumulators(event: CompletionEvent): Unit
updateAccumulators method merges the partial values of accumulators from a completed task into their "source" accumulators on the driver.
|It is called by handleTaskCompletion.|
For each AccumulableInfo in the
CompletionEvent, a partial value from a task is obtained (from
AccumulableInfo.update) and added to the driver’s accumulator (using
For named accumulators with the update value being a non-zero value, i.e. not
CompletionEvent.taskInfo.accumulableshas a new AccumulableInfo added.
FIXME Where are
checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit
|Spark Property||Default Value||Description|
When enabled (i.e.
workerRemoved( workerId: String, host: String, message: String): Unit
workerRemoved simply requests the DAGSchedulerEventProcessLoop to post a
postTaskEnd(event: CompletionEvent): Unit