The introduction that follows was highly influenced by the scaladoc of org.apache.spark.scheduler.DAGScheduler. As DAGScheduler is a private class it does not appear in the official API documentation. You are strongly encouraged to read the sources and only then read this and the related pages afterwards.
DAGScheduler is the scheduling layer of Apache Spark that implements stage-oriented scheduling.
DAGScheduler does three things in Spark (thorough explanations follow):
Computes an execution DAG, i.e. DAG of stages, for a job.
Determines the preferred locations to run each task on.
Handles failures due to shuffle output files being lost.
DAGScheduler computes a directed acyclic graph (DAG) of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run jobs. It then submits stages to TaskScheduler.
In addition to coming up with the execution DAG, DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes the information to TaskScheduler.
DAGScheduler tracks which RDDs are cached (or persisted) to avoid "recomputing" them, i.e. redoing the map side of a shuffle. DAGScheduler remembers what ShuffleMapStages have already produced output files (that are stored in BlockManagers).
DAGScheduler is only interested in cache location coordinates, i.e. host and executor id, per partition of a RDD.
Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures within a stage that are not caused by shuffle file loss are handled by the TaskScheduler itself, which will retry each task a small number of times before cancelling the whole stage.
DAGScheduler uses an event queue architecture in which a thread can post
DAGSchedulerEvent events, e.g. a new job or stage being submitted, that DAGScheduler reads and executes sequentially. See the section Internal Event Loop - dag-scheduler-event-loop.
DAGScheduler runs stages in topological order.
DAGScheduler uses SparkContext, TaskScheduler, LiveListenerBus, MapOutputTracker and BlockManager for its services. However, at the very minimum, DAGScheduler takes a
SparkContext only (and requests
SparkContext for the other services).
DAGScheduler uses an event bus to process scheduling-related events on a separate thread (one by one and asynchronously).
DAGScheduler starts the event bus when created and stops it when requested to stop.
DAGScheduler defines event-posting methods that allow posting DAGSchedulerEvent events to the event bus.
SparkContext is requested to cancel all running or scheduled Spark jobs
SparkContext is requested to cancel a job group
SparkContext is requested to cancel a stage
TaskSchedulerImpl is requested to handle resource offers (and a new executor is found in the resource offers)
SparkContext is requested to run an approximate job
SparkContext is requested to submit a MapStage for execution.
TaskSetManager is requested to handle a task fetching result
TaskSetManager is requested to abort
TaskSetManager is requested to start a task
TaskSchedulerImpl is requested to handle a removed worker event
runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit
runJob submits an action job to the DAGScheduler and waits for a result.
When the job succeeds, you should see the following INFO message in the logs:
Job [jobId] finished: [callSite], took [time] s
When the job fails, you should see the following INFO message in the logs and the exception (that led to the failure) is thrown.
Job [jobId] failed: [callSite], took [time] s
runJob is used when SparkContext is requested to run a job.
DAGScheduler keeps track of block locations per RDD and partition.
DAGScheduler uses TaskLocation that includes a host name and an executor id on that host (as
The keys are RDDs (their ids) and the values are arrays indexed by partition numbers.
Each entry is a set of block locations where a RDD partition is cached, i.e. the BlockManagers of the blocks.
Initialized empty when DAGScheduler is created.
DAGScheduler tracks ActiveJobs:
Removes an ActiveJob when requested to clean up after an ActiveJob and independent stages.
Removes all ActiveJobs when requested to doCancelAllJobs.
The number of ActiveJobs is available using job.activeJobs performance metric.
createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage
createResultStage is used when DAGScheduler is requested to handle a JobSubmitted event.
createShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage
Registering RDD [id] ([creationSite]) as input to shuffle [shuffleId]
MapOutputTrackerMasterWhether Shuffle Map Output Is Already Tracked
createShuffleMapStage is used when DAGScheduler is requested to find or create a ShuffleMapStage for a given ShuffleDependency.
cleanupStateForJobAndIndependentStages( job: ActiveJob): Unit
cleanupStateForJobAndIndependentStages cleans up the state for
job and any stages that are not part of any other job.
cleanupStateForJobAndIndependentStages looks the
job up in the internal jobIdToStageIds registry.
If no stages are found, the following ERROR is printed out to the logs:
No stages registered for job [jobId]
Oterwise, cleanupStateForJobAndIndependentStages uses stageIdToStage registry to find the stages (the real objects not ids!).
For each stage, cleanupStateForJobAndIndependentStages reads the jobs the stage belongs to.
job does not belong to the jobs of the stage, the following ERROR is printed out to the logs:
Job [jobId] not registered for stage [stageId] even though that stage was registered for the job
While removing from runningStages, you should see the following DEBUG message in the logs:
Removing running stage [stageId]
While removing from waitingStages, you should see the following DEBUG message in the logs:
Removing stage [stageId] from waiting set.
While removing from failedStages, you should see the following DEBUG message in the logs:
Removing stage [stageId] from failed set.
After all cleaning (using stageIdToStage as the source registry), if the stage belonged to the one and only
job, you should see the following DEBUG message in the logs:
After removal of stage [stageId], remaining stages = [stageIdToStage.size]
cleanupStateForJobAndIndependentStages is used in handleTaskCompletion when a
ResultTask has completed successfully, failJobAndIndependentStages and markMapStageJobAsFinished.
markMapStageJobAsFinished( job: ActiveJob, stats: MapOutputStatistics): Unit
markMapStageJobAsFinished marks the active
job finished and notifies Spark listeners.
Internally, markMapStageJobAsFinished marks the zeroth partition finished and increases the number of tasks finished in
getOrCreateParentStages( rdd: RDD[_], firstJobId: Int): List[Stage]
markStageAsFinished( stage: Stage, errorMessage: Option[String] = None, willRetry: Boolean = false): Unit
markStageAsFinished is used when…FIXME
getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage
If not found, getOrCreateShuffleMapStage finds all the missing ancestor shuffle dependencies and creates the ShuffleMapStage stages (including one for the input ShuffleDependency).
getOrCreateShuffleMapStage is used when DAGScheduler is requested to find or create missing direct parent ShuffleMapStages of an RDD, find missing parent ShuffleMapStages for a stage, handle a MapStageSubmitted event, and check out stage dependency on a stage.
getMissingAncestorShuffleDependencies( rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]]
A missing shuffle dependency of a RDD is a dependency not registered in
Internally, getMissingAncestorShuffleDependencies finds direct parent shuffle dependencies of the input RDD and collects the ones that are not registered in
shuffleIdToMapStage internal registry. It repeats the process for the RDDs of the parent shuffle dependencies.
getMissingAncestorShuffleDependencies is used when DAGScheduler is requested to find all ShuffleMapStage stages for a ShuffleDependency.
getShuffleDependencies( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]]
Internally, getShuffleDependencies takes the direct shuffle dependencies of the input RDD and direct shuffle dependencies of all the parent non-
ShuffleDependencies in the dependency chain (aka RDD lineage).
getShuffleDependencies is used when DAGScheduler is requested to find or create missing direct parent ShuffleMapStages (for ShuffleDependencies of a RDD) and find all missing shuffle dependencies for a given RDD.
failJobAndIndependentStages( job: ActiveJob, failureReason: String, exception: Option[Throwable] = None): Unit
failJobAndIndependentStages fails the input
job and all the stages that are only used by the job.
Internally, failJobAndIndependentStages uses
jobIdToStageIds internal registry to look up the stages registered for the job.
If no stages could be found, you should see the following ERROR message in the logs:
No stages registered for job [id]
Otherwise, for every stage, failJobAndIndependentStages finds the job ids the stage belongs to.
If no stages could be found or the job is not referenced by the stages, you should see the following ERROR message in the logs:
Job [id] not registered for stage [id] even though that stage was registered for the job
Only when there is exactly one job registered for the stage and the stage is in RUNNING state (in
runningStages internal registry),
TaskScheduler is requested to cancel the stage’s tasks and marks the stage finished.
|failJobAndIndependentStages uses jobIdToStageIds, stageIdToStage, and runningStages internal registries.|
failJobAndIndependentStages is used when…FIXME
abortStage( failedStage: Stage, reason: String, exception: Option[Throwable]): Unit
abortStage is an internal method that finds all the active jobs that depend on the
failedStage stage and fails them.
Internally, abortStage looks the
failedStage stage up in the internal stageIdToStage registry and exits if there the stage was not registered earlier.
At this time, the
completionTime property (of the failed stage’s StageInfo) is assigned to the current time (millis).
All the active jobs that depend on the failed stage (as calculated above) and the stages that do not belong to other jobs (aka independent stages) are failed (with the failure reason being "Job aborted due to stage failure: [reason]" and the input
If there are no jobs depending on the failed stage, you should see the following INFO message in the logs:
Ignoring failure of [failedStage] because all jobs depending on it are done
stageDependsOn( stage: Stage, target: Stage): Boolean
stageDependsOn compares two stages and returns whether the
stage depends on
target stage (i.e.
true) or not (i.e.
Internally, stageDependsOn walks through the graph of RDDs of the input
stage. For every RDD in the RDD’s dependencies (using
RDD.dependencies) stageDependsOn adds the RDD of a NarrowDependency to a stack of RDDs to visit while for a ShuffleDependency it finds
ShuffleMapStage stages for a
ShuffleDependency for the dependency and the
stage's first job id that it later adds to a stack of RDDs to visit if the map stage is ready, i.e. all the partitions have shuffle outputs.
After all the RDDs of the input
stage are visited, stageDependsOn checks if the
target's RDD is among the RDDs of the
stage, i.e. whether the
stage depends on
stageDependsOn is used when DAGScheduler is requested to abort a stage.
submitWaitingChildStages( parent: Stage): Unit
submitWaitingChildStages submits for execution all waiting stages for which the input
parent Stage is the direct parent.
Waiting stages are the stages registered in
When executed, you should see the following
TRACE messages in the logs:
Checking if any dependencies of [parent] are now runnable running: [runningStages] waiting: [waitingStages] failed: [failedStages]
submitWaitingChildStages finds child stages of the input
parent stage, removes them from
waitingStages internal registry, and submits one by one sorted by their job ids.
submitStage( stage: Stage): Unit
submitStage submits the input
stage or its missing parents (if there any stages not computed yet before the input
|submitStage is also used to resubmit failed stages.|
submitStage recursively submits any missing parents of the
Internally, submitStage first finds the earliest-created job id that needs the
A stage itself tracks the jobs (their ids) it belongs to (using the internal
The following steps depend on whether there is a job or not.
If there are no jobs that require the
stage, submitStage aborts it with the reason:
No active job for stage [id]
If however there is a job for the
stage, you should see the following DEBUG message in the logs:
stage ready for submission, submitStage calculates the list of missing parent stages of the
stage (sorted by their job ids). You should see the following DEBUG message in the logs:
stage has no parent stages missing, you should see the following INFO message in the logs:
Submitting [stage] ([stage.rdd]), which has no missing parents
submitStage submits the
stage (with the earliest-created job id) and finishes.
submitStage is used recursively for missing parents of the given stage and when DAGScheduler is requested for the following:
A single stage can be re-executed in multiple attempts due to fault recovery. The number of attempts is configured (FIXME).
TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits the lost stage. This is detected through a
FetchFailed, or an ExecutorLost event. DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit
TaskSets for any lost stage(s) that compute the missing tasks.
Please note that tasks from the old attempts of a stage could still be running.
A stage object tracks multiple StageInfo objects to pass to Spark listeners or the web UI.
StageInfo for the most recent attempt for a stage is accessible through
DAGScheduler uses the following ScheduledThreadPoolExecutors (with the policy of removing cancelled tasks from a work queue at time of cancellation):
They are created using
ThreadUtils.newDaemonSingleThreadScheduledExecutor method that uses Guava DSL to instantiate a ThreadFactory.
getMissingParentStages( stage: Stage): List[Stage]
Internally, getMissingParentStages starts with the
stage's RDD and walks up the tree of all parent RDDs to find uncached partitions.
An uncached partition of a RDD is a partition that has
NarrowDependency, getMissingParentStages simply marks the corresponding RDD to visit and moves on to a next dependency of a RDD or works on another unvisited parent RDD.
|NarrowDependency is a RDD dependency that allows for pipelined execution.|
getMissingParentStages focuses on
|ShuffleDependency is a RDD dependency that represents a dependency on the output of a ShuffleMapStage, i.e. shuffle map stage.|
ShuffleDependency, getMissingParentStages finds
ShuffleMapStage stages. If the
ShuffleMapStage is not available, it is added to the set of missing (map) stages.
|FIXME…IMAGE with ShuffleDependencies queried|
submitMissingTasks( stage: Stage, jobId: Int): Unit
submitMissingTasks prints out the following DEBUG message to the logs:
submitMissingTasks adds the stage to the runningStages internal registry.
submitMissingTasks determines preferred locations (task locality preferences) of the missing partitions.
submitMissingTasks requests the stage for a new stage attempt.
submitMissingTasks uses the closure Serializer to serialize the stage and create a so-called task binary. submitMissingTasks serializes the RDD (of the stage) and either the ShuffleDependency or the compute function based on the type of the stage, i.e. ShuffleMapStage and ResultStage, respectively.
submitMissingTasks creates a broadcast variable for the task binary.
|That shows how important broadcast variables are for Spark itself to distribute data among executors in a Spark application in the most efficient way.|
submitMissingTasks creates tasks for every missing partition:
If there are tasks to submit for execution (i.e. there are missing partitions in the stage), submitMissingTasks prints out the following INFO message to the logs:
Submitting [size] missing tasks from [stage] ([rdd]) (first 15 tasks are for partitions [partitionIds])
With no tasks to submit for execution, submitMissingTasks marks the stage as finished successfully.
submitMissingTasks prints out the following DEBUG messages based on the type of the stage:
Stage [stage] is actually done; (available: [isAvailable],available outputs: [numAvailableOutputs],partitions: [numPartitions])
Stage [stage] is actually done; (partitions: [numPartitions])
In the end, with no tasks to submit for execution, submitMissingTasks submits waiting child stages for execution and exits.
submitMissingTasks is used when DAGScheduler is requested to submit a stage for execution.
getPreferredLocs( rdd: RDD[_], partition: Int): Seq[TaskLocation]
getPreferredLocs is simply an alias for the internal (recursive) getPreferredLocsInternal.
getPreferredLocs is used when…FIXME
getCacheLocs( rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]]
The size of the collection from getCacheLocs is exactly the number of partitions in
|The size of every TaskLocation collection (i.e. every entry in the result of getCacheLocs) is exactly the number of blocks managed using BlockManagers on executors.|
Internally, getCacheLocs finds
rdd in the cacheLocs internal registry (of partition locations per RDD).
NONE storage level (i.e. no caching), the result is an empty locations (i.e. no location preference).
For other non-
NONE storage levels, getCacheLocs requests
BlockManagerMaster for block locations that are then mapped to TaskLocations with the hostname of the owning
BlockManager for a block (of a partition) and the executor id.
getCacheLocs requests locations from
|DAGScheduler uses TaskLocations (with host and executor) while BlockManagerMaster uses BlockManagerId (to track similar information, i.e. block locations).|
getPreferredLocsInternal( rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation]
getPreferredLocsInternal first finds the
TaskLocations for the
partition of the
rdd (using cacheLocs internal cache) and returns them.
Otherwise, if not found, getPreferredLocsInternal requests
rdd for the preferred locations of
partition and returns them.
|Preferred locations of the partitions of a RDD are also called placement preferences or locality preferences.|
If all the attempts fail to yield any non-empty result, getPreferredLocsInternal returns an empty collection of TaskLocations.
getPreferredLocsInternal is used when DAGScheduler is requested for the preferred locations for missing partitions.
updateAccumulators( event: CompletionEvent): Unit
updateAccumulators merges the partial values of accumulators from a completed task into their "source" accumulators on the driver.
|It is called by handleTaskCompletion.|
For each AccumulableInfo in the
CompletionEvent, a partial value from a task is obtained (from
AccumulableInfo.update) and added to the driver’s accumulator (using
For named accumulators with the update value being a non-zero value, i.e. not
CompletionEvent.taskInfo.accumulableshas a new AccumulableInfo added.
FIXME Where are
updateAccumulators is used when DAGScheduler is requested to handle a task completion.
cleanUpAfterSchedulerStop is used when DAGSchedulerEventProcessLoop is requested to onStop.
removeExecutorAndUnregisterOutputs( execId: String, fileLost: Boolean, hostToUnregisterOutputs: Option[String], maybeEpoch: Option[Long] = None): Unit
executorHeartbeatReceived( execId: String, // (taskId, stageId, stageAttemptId, accumUpdates) accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])], blockManagerId: BlockManagerId): Boolean
executorHeartbeatReceived is used when TaskSchedulerImpl is requested to handle an executor heartbeat.
postTaskEnd( event: CompletionEvent): Unit
postTaskEnd is used when DAGScheduler is requested to handle a task completion.
handleBeginEvent( task: Task[_], taskInfo: TaskInfo): Unit
handleBeginEvent is used when DAGSchedulerEventProcessLoop is requested to handle a BeginEvent event.
handleTaskCompletion( event: CompletionEvent): Unit
handleTaskCompletion is used when DAGSchedulerEventProcessLoop is requested to handle a CompletionEvent event.
handleExecutorAdded( execId: String, host: String): Unit
handleExecutorAdded is used when DAGSchedulerEventProcessLoop is requested to handle an ExecutorAdded event.
handleExecutorLost( execId: String, workerLost: Boolean): Unit
handleExecutorLost is used when DAGSchedulerEventProcessLoop is requested to handle an ExecutorLost event.
handleGetTaskResult( taskInfo: TaskInfo): Unit
handleGetTaskResult is used when DAGSchedulerEventProcessLoop is requested to handle a GettingResultEvent event.
handleJobCancellation( jobId: Int, reason: Option[String]): Unit
handleJobGroupCancelled( groupId: String): Unit
handleJobGroupCancelled is used when DAGScheduler is requested to handle JobGroupCancelled event.
handleJobSubmitted( jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties): Unit
handleJobSubmitted creates a new
finalStage in the picture below) given the input
handleJobSubmitted clears the internal cache of RDD partition locations.
|FIXME Why is this clearing here so important?|
You should see the following INFO messages in the logs:
Got job [id] ([callSite]) with [number] output partitions Final stage: [stage] ([name]) Parents of final stage: [parents] Missing parents: [missingStages]
handleJobSubmitted finds all the registered stages for the input
jobId and collects their latest
handleJobSubmitted is used when DAGSchedulerEventProcessLoop is requested to handle a JobSubmitted event.
handleMapStageSubmitted( jobId: Int, dependency: ShuffleDependency[_, _, _], callSite: CallSite, listener: JobListener, properties: Properties): Unit
handleMapStageSubmitted is used when DAGSchedulerEventProcessLoop is requested to handle a MapStageSubmitted event.
resubmitFailedStages is used when DAGSchedulerEventProcessLoop is requested to handle a ResubmitFailedStages event.
handleSpeculativeTaskSubmitted is used when DAGSchedulerEventProcessLoop is requested to handle a SpeculativeTaskSubmitted event.
handleStageCancellation is used when DAGSchedulerEventProcessLoop is requested to handle a StageCancelled event.
handleTaskSetFailed is used when DAGSchedulerEventProcessLoop is requested to handle a TaskSetFailed event.
handleWorkerRemoved( workerId: String, host: String, message: String): Unit
handleWorkerRemoved is used when DAGSchedulerEventProcessLoop is requested to handle a WorkerRemoved event.
ALL logging level for
org.apache.spark.scheduler.DAGScheduler logger to see what happens inside.
Add the following line to
Refer to Logging.
Stages that failed due to fetch failures (when a task fails with
A stage is added when submitMissingTasks gets executed (without first checking if the stage has not already been added).
Used when DAGScheduler creates a shuffle map stage, creates a result stage, cleans up job state and independent stages, is informed that a task is started, a taskset has failed, a job is submitted (to compute a