DAGScheduler

The introduction that follows was highly influenced by the scaladoc of org.apache.spark.scheduler.DAGScheduler. As DAGScheduler is a private class it does not appear in the official API documentation. You are strongly encouraged to read the sources and only then read this and the related pages afterwards.

Introduction

DAGScheduler is the scheduling layer of Apache Spark that implements stage-oriented scheduling.

DAGScheduler transforms a logical execution plan (i.e. RDD lineage of dependencies built using RDD transformations) to a physical execution plan (using stages).

dagscheduler rdd lineage stage dag
Figure 1. DAGScheduler Transforming RDD Lineage Into Stage DAG

After an action has been called, SparkContext hands over a logical plan to DAGScheduler that it in turn translates to a set of stages that are submitted as TaskSets for execution.

dagscheduler rdd partitions job resultstage
Figure 2. Executing action leads to new ResultStage and ActiveJob in DAGScheduler

The fundamental concepts of DAGScheduler are jobs and stages (refer to Jobs and Stages respectively) that it tracks through internal registries and counters.

DAGScheduler works solely on the driver and is created as part of SparkContext’s initialization (right after TaskScheduler and SchedulerBackend are ready).

dagscheduler new instance
Figure 3. DAGScheduler as created by SparkContext with other services

DAGScheduler does three things in Spark (thorough explanations follow):

  • Computes an execution DAG, i.e. DAG of stages, for a job.

  • Determines the preferred locations to run each task on.

  • Handles failures due to shuffle output files being lost.

DAGScheduler computes a directed acyclic graph (DAG) of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run jobs. It then submits stages to TaskScheduler.

dagscheduler submitjob
Figure 4. DAGScheduler.submitJob

In addition to coming up with the execution DAG, DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes the information to TaskScheduler.

DAGScheduler tracks which RDDs are cached (or persisted) to avoid "recomputing" them, i.e. redoing the map side of a shuffle. DAGScheduler remembers what ShuffleMapStages have already produced output files (that are stored in BlockManagers).

DAGScheduler is only interested in cache location coordinates, i.e. host and executor id, per partition of a RDD.

Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures within a stage that are not caused by shuffle file loss are handled by the TaskScheduler itself, which will retry each task a small number of times before cancelling the whole stage.

DAGScheduler uses an event queue architecture in which a thread can post DAGSchedulerEvent events, e.g. a new job or stage being submitted, that DAGScheduler reads and executes sequentially. See the section Internal Event Loop - dag-scheduler-event-loop.

DAGScheduler runs stages in topological order.

DAGScheduler uses SparkContext, TaskScheduler, LiveListenerBus, MapOutputTracker and BlockManager for its services. However, at the very minimum, DAGScheduler takes a SparkContext only (and requests SparkContext for the other services).

When DAGScheduler schedules a job as a result of executing an action on a RDD or calling SparkContext.runJob() method directly, it spawns parallel tasks to compute (partial) results per partition.

Creating Instance

DAGScheduler takes the following to be created:

While being created, DAGScheduler associates itself with the TaskScheduler and starts DAGScheduler Event Bus.

DAGScheduler Event Bus

DAGScheduler uses an event bus to process scheduling-related events on a separate thread (one by one and asynchronously).

DAGScheduler starts the event bus when created and stops it when requested to stop.

DAGScheduler defines event-posting methods that allow posting DAGSchedulerEvent events to the event bus.

Table 1. DAGScheduler Event Posting Methods
Method Event Posted Trigger

cancelAllJobs

AllJobsCancelled

SparkContext is requested to cancel all running or scheduled Spark jobs

cancelJob

JobCancelled

SparkContext or JobWaiter are requested to cancel a Spark job

cancelJobGroup

JobGroupCancelled

SparkContext is requested to cancel a job group

cancelStage

StageCancelled

SparkContext is requested to cancel a stage

executorAdded

ExecutorAdded

TaskSchedulerImpl is requested to handle resource offers (and a new executor is found in the resource offers)

executorLost

ExecutorLost

TaskSchedulerImpl is requested to handle a task status update (and a task gets lost which is used to indicate that the executor got broken and hence should be considered lost) or executorLost

runApproximateJob

JobSubmitted

SparkContext is requested to run an approximate job

speculativeTaskSubmitted

SpeculativeTaskSubmitted

submitJob

JobSubmitted

submitMapStage

MapStageSubmitted

SparkContext is requested to submit a MapStage for execution.

taskEnded

CompletionEvent

TaskSetManager is requested to handleSuccessfulTask, handleFailedTask, and executorLost

taskGettingResult

GettingResultEvent

TaskSetManager is requested to handle a task fetching result

taskSetFailed

TaskSetFailed

TaskSetManager is requested to abort

taskStarted

BeginEvent

TaskSetManager is requested to start a task

workerRemoved

WorkerRemoved

TaskSchedulerImpl is requested to handle a removed worker event

DAGScheduler and TaskScheduler

Running Job

runJob[T, U](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  callSite: CallSite,
  resultHandler: (Int, U) => Unit,
  properties: Properties): Unit

runJob submits an action job to the DAGScheduler and waits for a result.

Internally, runJob executes submitJob and then waits until a result comes using JobWaiter.

When the job succeeds, you should see the following INFO message in the logs:

Job [jobId] finished: [callSite], took [time] s

When the job fails, you should see the following INFO message in the logs and the exception (that led to the failure) is thrown.

Job [jobId] failed: [callSite], took [time] s

runJob is used when SparkContext is requested to run a job.

Partition Placement Preferences

DAGScheduler keeps track of block locations per RDD and partition.

DAGScheduler uses TaskLocation that includes a host name and an executor id on that host (as ExecutorCacheTaskLocation).

The keys are RDDs (their ids) and the values are arrays indexed by partition numbers.

Each entry is a set of block locations where a RDD partition is cached, i.e. the BlockManagers of the blocks.

Initialized empty when DAGScheduler is created.

Used when DAGScheduler is requested for the locations of the cache blocks of a RDD or clear them.

ActiveJobs

DAGScheduler tracks ActiveJobs:

DAGScheduler uses ActiveJobs registry when requested to handle JobGroupCancelled or TaskCompletion events, to cleanUpAfterSchedulerStop and to abort a stage.

The number of ActiveJobs is available using job.activeJobs performance metric.

Creating ResultStage for RDD

createResultStage(
  rdd: RDD[_],
  func: (TaskContext, Iterator[_]) => _,
  partitions: Array[Int],
  jobId: Int,
  callSite: CallSite): ResultStage

createResultStage…​FIXME

createResultStage is used when DAGScheduler is requested to handle a JobSubmitted event.

Creating ShuffleMapStage for ShuffleDependency

createShuffleMapStage(
  shuffleDep: ShuffleDependency[_, _, _],
  jobId: Int): ShuffleMapStage

createShuffleMapStage creates a ShuffleMapStage for the given ShuffleDependency as follows:

createShuffleMapStage registers the ShuffleMapStage in the stageIdToStage and shuffleIdToMapStage internal registries.

createShuffleMapStage updateJobIdStageIdMaps.

If not, createShuffleMapStage prints out the following INFO message to the logs and requests the MapOutputTrackerMaster to register the shuffle.

Registering RDD [id] ([creationSite]) as input to shuffle [shuffleId]
DAGScheduler MapOutputTrackerMaster containsShuffle
Figure 5. DAGScheduler Asks MapOutputTrackerMaster Whether Shuffle Map Output Is Already Tracked

createShuffleMapStage is used when DAGScheduler is requested to find or create a ShuffleMapStage for a given ShuffleDependency.

Cleaning Up After Job and Independent Stages

cleanupStateForJobAndIndependentStages(
  job: ActiveJob): Unit

cleanupStateForJobAndIndependentStages cleans up the state for job and any stages that are not part of any other job.

cleanupStateForJobAndIndependentStages looks the job up in the internal jobIdToStageIds registry.

If no stages are found, the following ERROR is printed out to the logs:

No stages registered for job [jobId]

Oterwise, cleanupStateForJobAndIndependentStages uses stageIdToStage registry to find the stages (the real objects not ids!).

For each stage, cleanupStateForJobAndIndependentStages reads the jobs the stage belongs to.

If the job does not belong to the jobs of the stage, the following ERROR is printed out to the logs:

Job [jobId] not registered for stage [stageId] even though that stage was registered for the job

If the job was the only job for the stage, the stage (and the stage id) gets cleaned up from the registries, i.e. runningStages, shuffleIdToMapStage, waitingStages, failedStages and stageIdToStage.

While removing from runningStages, you should see the following DEBUG message in the logs:

Removing running stage [stageId]

While removing from waitingStages, you should see the following DEBUG message in the logs:

Removing stage [stageId] from waiting set.

While removing from failedStages, you should see the following DEBUG message in the logs:

Removing stage [stageId] from failed set.

After all cleaning (using stageIdToStage as the source registry), if the stage belonged to the one and only job, you should see the following DEBUG message in the logs:

After removal of stage [stageId], remaining stages = [stageIdToStage.size]

The job is removed from jobIdToStageIds, jobIdToActiveJob, activeJobs registries.

The final stage of the job is removed, i.e. ResultStage or ShuffleMapStage.

Marking ShuffleMapStage Job Finished

markMapStageJobAsFinished(
  job: ActiveJob,
  stats: MapOutputStatistics): Unit

markMapStageJobAsFinished marks the active job finished and notifies Spark listeners.

Internally, markMapStageJobAsFinished marks the zeroth partition finished and increases the number of tasks finished in job.

Ultimately, SparkListenerJobEnd is posted to LiveListenerBus (as listenerBus) for the job, the current time (in millis) and JobSucceeded job result.

markMapStageJobAsFinished is used in handleMapStageSubmitted and handleTaskCompletion.

Finding Or Creating Missing Direct Parent ShuffleMapStages (For ShuffleDependencies) of RDD

getOrCreateParentStages(
  rdd: RDD[_],
  firstJobId: Int): List[Stage]

getOrCreateParentStages finds all direct parent ShuffleDependencies of the input rdd and then finds ShuffleMapStage stages for each ShuffleDependency.

getOrCreateParentStages is used when DAGScheduler is requested to create a ShuffleMapStage or a ResultStage.

Marking Stage Finished

markStageAsFinished(
  stage: Stage,
  errorMessage: Option[String] = None,
  willRetry: Boolean = false): Unit

markStageAsFinished…​FIXME

markStageAsFinished is used when…​FIXME

Finding or Creating ShuffleMapStage for ShuffleDependency

getOrCreateShuffleMapStage(
  shuffleDep: ShuffleDependency[_, _, _],
  firstJobId: Int): ShuffleMapStage

getOrCreateShuffleMapStage finds the ShuffleMapStage in the shuffleIdToMapStage internal registry and returns it if available.

If not found, getOrCreateShuffleMapStage finds all the missing ancestor shuffle dependencies and creates the ShuffleMapStage stages (including one for the input ShuffleDependency).

Finding Missing ShuffleDependencies For RDD

getMissingAncestorShuffleDependencies(
  rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]]

getMissingAncestorShuffleDependencies finds all missing shuffle dependencies for the given RDD traversing its RDD lineage.

A missing shuffle dependency of a RDD is a dependency not registered in shuffleIdToMapStage internal registry.

Internally, getMissingAncestorShuffleDependencies finds direct parent shuffle dependencies‚ÄČof the input RDD and collects the ones that are not registered in shuffleIdToMapStage internal registry. It repeats the process for the RDDs of the parent shuffle dependencies.

getMissingAncestorShuffleDependencies is used when DAGScheduler is requested to find all ShuffleMapStage stages for a ShuffleDependency.

Finding Direct Parent Shuffle Dependencies of RDD

getShuffleDependencies(
  rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]]

getShuffleDependencies finds direct parent shuffle dependencies for the given RDD.

spark DAGScheduler getShuffleDependencies
Figure 6. getShuffleDependencies Finds Direct Parent ShuffleDependencies (shuffle1 and shuffle2)

Internally, getShuffleDependencies takes the direct shuffle dependencies of the input RDD and direct shuffle dependencies of all the parent non-ShuffleDependencies in the dependency chain (aka RDD lineage).

getShuffleDependencies is used when DAGScheduler is requested to find or create missing direct parent ShuffleMapStages (for ShuffleDependencies of a RDD) and find all missing shuffle dependencies for a given RDD.

Failing Job and Independent Single-Job Stages

failJobAndIndependentStages(
  job: ActiveJob,
  failureReason: String,
  exception: Option[Throwable] = None): Unit

failJobAndIndependentStages fails the input job and all the stages that are only used by the job.

Internally, failJobAndIndependentStages uses jobIdToStageIds internal registry to look up the stages registered for the job.

If no stages could be found, you should see the following ERROR message in the logs:

No stages registered for job [id]

Otherwise, for every stage, failJobAndIndependentStages finds the job ids the stage belongs to.

If no stages could be found or the job is not referenced by the stages, you should see the following ERROR message in the logs:

Job [id] not registered for stage [id] even though that stage was registered for the job

Only when there is exactly one job registered for the stage and the stage is in RUNNING state (in runningStages internal registry), TaskScheduler is requested to cancel the stage’s tasks and marks the stage finished.

failJobAndIndependentStages uses jobIdToStageIds, stageIdToStage, and runningStages internal registries.

failJobAndIndependentStages is used when…​FIXME

Aborting Stage

abortStage(
  failedStage: Stage,
  reason: String,
  exception: Option[Throwable]): Unit

abortStage is an internal method that finds all the active jobs that depend on the failedStage stage and fails them.

Internally, abortStage looks the failedStage stage up in the internal stageIdToStage registry and exits if there the stage was not registered earlier.

If it was, abortStage finds all the active jobs (in the internal activeJobs registry) with the final stage depending on the failedStage stage.

At this time, the completionTime property (of the failed stage’s StageInfo) is assigned to the current time (millis).

All the active jobs that depend on the failed stage (as calculated above) and the stages that do not belong to other jobs (aka independent stages) are failed (with the failure reason being "Job aborted due to stage failure: [reason]" and the input exception).

If there are no jobs depending on the failed stage, you should see the following INFO message in the logs:

Ignoring failure of [failedStage] because all jobs depending on it are done

Checking Out Stage Dependency on Given Stage

stageDependsOn(
  stage: Stage,
  target: Stage): Boolean

stageDependsOn compares two stages and returns whether the stage depends on target stage (i.e. true) or not (i.e. false).

A stage A depends on stage B if B is among the ancestors of A.

Internally, stageDependsOn walks through the graph of RDDs of the input stage. For every RDD in the RDD’s dependencies (using RDD.dependencies) stageDependsOn adds the RDD of a NarrowDependency to a stack of RDDs to visit while for a ShuffleDependency it finds ShuffleMapStage stages for a ShuffleDependency for the dependency and the stage's first job id that it later adds to a stack of RDDs to visit if the map stage is ready, i.e. all the partitions have shuffle outputs.

After all the RDDs of the input stage are visited, stageDependsOn checks if the target's RDD is among the RDDs of the stage, i.e. whether the stage depends on target stage.

stageDependsOn is used when DAGScheduler is requested to abort a stage.

Submitting Waiting Child Stages for Execution

submitWaitingChildStages(
  parent: Stage): Unit

submitWaitingChildStages submits for execution all waiting stages for which the input parent Stage is the direct parent.

Waiting stages are the stages registered in waitingStages internal registry.

When executed, you should see the following TRACE messages in the logs:

Checking if any dependencies of [parent] are now runnable
running: [runningStages]
waiting: [waitingStages]
failed: [failedStages]

submitWaitingChildStages finds child stages of the input parent stage, removes them from waitingStages internal registry, and submits one by one sorted by their job ids.

submitWaitingChildStages is used when DAGScheduler is requested to submits missing tasks for a stage and handles a successful ShuffleMapTask completion.

Submitting Stage (with Missing Parents) for Execution

submitStage(
  stage: Stage): Unit

submitStage submits the input stage or its missing parents (if there any stages not computed yet before the input stage could).

submitStage is also used to resubmit failed stages.

submitStage recursively submits any missing parents of the stage.

Internally, submitStage first finds the earliest-created job id that needs the stage.

A stage itself tracks the jobs (their ids) it belongs to (using the internal jobIds registry).

The following steps depend on whether there is a job or not.

If there are no jobs that require the stage, submitStage aborts it with the reason:

No active job for stage [id]

If however there is a job for the stage, you should see the following DEBUG message in the logs:

submitStage([stage])

submitStage checks the status of the stage and continues when it was not recorded in waiting, running or failed internal registries. It simply exits otherwise.

With the stage ready for submission, submitStage calculates the list of missing parent stages of the stage (sorted by their job ids). You should see the following DEBUG message in the logs:

missing: [missing]

When the stage has no parent stages missing, you should see the following INFO message in the logs:

Submitting [stage] ([stage.rdd]), which has no missing parents

submitStage submits the stage (with the earliest-created job id) and finishes.

If however there are missing parent stages for the stage, submitStage submits all the parent stages, and the stage is recorded in the internal waitingStages registry.

submitStage is used recursively for missing parents of the given stage and when DAGScheduler is requested for the following:

Stage Attempts

A single stage can be re-executed in multiple attempts due to fault recovery. The number of attempts is configured (FIXME).

If TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits the lost stage. This is detected through a CompletionEvent with FetchFailed, or an ExecutorLost event. DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost stage(s) that compute the missing tasks.

Please note that tasks from the old attempts of a stage could still be running.

A stage object tracks multiple StageInfo objects to pass to Spark listeners or the web UI.

The latest StageInfo for the most recent attempt for a stage is accessible through latestInfo.

Preferred Locations

DAGScheduler computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.

Adaptive Query Planning / Adaptive Scheduling

See SPARK-9850 Adaptive execution in Spark for the design document. The work is currently in progress.

DAGScheduler.submitMapStage method is used for adaptive query planning, to run map stages and look at statistics about their outputs before submitting downstream stages.

ScheduledExecutorService daemon services

DAGScheduler uses the following ScheduledThreadPoolExecutors (with the policy of removing cancelled tasks from a work queue at time of cancellation):

They are created using ThreadUtils.newDaemonSingleThreadScheduledExecutor method that uses Guava DSL to instantiate a ThreadFactory.

Finding Missing Parent ShuffleMapStages For Stage

getMissingParentStages(
  stage: Stage): List[Stage]

getMissingParentStages finds missing parent ShuffleMapStages in the dependency graph of the input stage (using the breadth-first search algorithm).

Internally, getMissingParentStages starts with the stage's RDD and walks up the tree of all parent RDDs to find uncached partitions.

A Stage tracks the associated RDD using rdd property.
An uncached partition of a RDD is a partition that has Nil in the internal registry of partition locations per RDD (which results in no RDD blocks in any of the active BlockManagers on executors).

getMissingParentStages traverses the parent dependencies of the RDD and acts according to their type, i.e. ShuffleDependency or NarrowDependency.

ShuffleDependency and NarrowDependency are the main top-level Dependencies.

For each NarrowDependency, getMissingParentStages simply marks the corresponding RDD to visit and moves on to a next dependency of a RDD or works on another unvisited parent RDD.

NarrowDependency is a RDD dependency that allows for pipelined execution.

getMissingParentStages focuses on ShuffleDependency dependencies.

ShuffleDependency is a RDD dependency that represents a dependency on the output of a ShuffleMapStage, i.e. shuffle map stage.

For each ShuffleDependency, getMissingParentStages finds ShuffleMapStage stages. If the ShuffleMapStage is not available, it is added to the set of missing (map) stages.

A ShuffleMapStage is available when all its partitions are computed, i.e. results are available (as blocks).
FIXME…​IMAGE with ShuffleDependencies queried

getMissingParentStages is used when DAGScheduler is requested to submit a stage and handle JobSubmitted and MapStageSubmitted events.

Submitting Missing Tasks of Stage

submitMissingTasks(
  stage: Stage,
  jobId: Int): Unit

submitMissingTasks prints out the following DEBUG message to the logs:

submitMissingTasks([stage])

submitMissingTasks requests the given Stage for the missing partitions (partitions that need to be computed).

submitMissingTasks adds the stage to the runningStages internal registry.

submitMissingTasks notifies the OutputCommitCoordinator that stage execution started.

submitMissingTasks determines preferred locations (task locality preferences) of the missing partitions.

submitMissingTasks requests the stage for a new stage attempt.

submitMissingTasks requests the LiveListenerBus to post a SparkListenerStageSubmitted event.

submitMissingTasks uses the closure Serializer to serialize the stage and create a so-called task binary. submitMissingTasks serializes the RDD (of the stage) and either the ShuffleDependency or the compute function based on the type of the stage, i.e. ShuffleMapStage and ResultStage, respectively.

submitMissingTasks creates a broadcast variable for the task binary.

That shows how important broadcast variables are for Spark itself to distribute data among executors in a Spark application in the most efficient way.

submitMissingTasks creates tasks for every missing partition:

If there are tasks to submit for execution (i.e. there are missing partitions in the stage), submitMissingTasks prints out the following INFO message to the logs:

Submitting [size] missing tasks from [stage] ([rdd]) (first 15 tasks are for partitions [partitionIds])

submitMissingTasks requests the TaskScheduler to submit the tasks for execution (as a new TaskSet).

With no tasks to submit for execution, submitMissingTasks marks the stage as finished successfully.

submitMissingTasks prints out the following DEBUG messages based on the type of the stage:

Stage [stage] is actually done; (available: [isAvailable],available outputs: [numAvailableOutputs],partitions: [numPartitions])

or

Stage [stage] is actually done; (partitions: [numPartitions])

for ShuffleMapStage and ResultStage, respectively.

In the end, with no tasks to submit for execution, submitMissingTasks submits waiting child stages for execution and exits.

submitMissingTasks is used when DAGScheduler is requested to submit a stage for execution.

Finding Preferred Locations for Missing Partitions

getPreferredLocs(
  rdd: RDD[_],
  partition: Int): Seq[TaskLocation]

getPreferredLocs is simply an alias for the internal (recursive) getPreferredLocsInternal.

getPreferredLocs is used when…​FIXME

Finding BlockManagers (Executors) for Cached RDD Partitions (aka Block Location Discovery)

getCacheLocs(
  rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]]

getCacheLocs gives TaskLocations (block locations) for the partitions of the input rdd. getCacheLocs caches lookup results in cacheLocs internal registry.

The size of the collection from getCacheLocs is exactly the number of partitions in rdd RDD.
The size of every TaskLocation collection (i.e. every entry in the result of getCacheLocs) is exactly the number of blocks managed using BlockManagers on executors.

Internally, getCacheLocs finds rdd in the cacheLocs internal registry (of partition locations per RDD).

If rdd is not in cacheLocs internal registry, getCacheLocs branches per its storage level.

For NONE storage level (i.e. no caching), the result is an empty locations (i.e. no location preference).

For other non-NONE storage levels, getCacheLocs requests BlockManagerMaster for block locations that are then mapped to TaskLocations with the hostname of the owning BlockManager for a block (of a partition) and the executor id.

getCacheLocs uses BlockManagerMaster that was defined when DAGScheduler was created.

getCacheLocs records the computed block locations per partition (as TaskLocation) in cacheLocs internal registry.

getCacheLocs requests locations from BlockManagerMaster using RDDBlockId with the RDD id and the partition indices (which implies that the order of the partitions matters to request proper blocks).
DAGScheduler uses TaskLocations (with host and executor) while BlockManagerMaster uses BlockManagerId (to track similar information, i.e. block locations).

getCacheLocs is used when DAGScheduler is requested to finds missing parent MapStages and getPreferredLocsInternal.

Finding Placement Preferences for RDD Partition (recursively)

getPreferredLocsInternal(
  rdd: RDD[_],
  partition: Int,
  visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation]

getPreferredLocsInternal first finds the TaskLocations for the partition of the rdd (using cacheLocs internal cache) and returns them.

Otherwise, if not found, getPreferredLocsInternal requests rdd for the preferred locations of partition and returns them.

Preferred locations of the partitions of a RDD are also called placement preferences or locality preferences.

Otherwise, if not found, getPreferredLocsInternal finds the first parent NarrowDependency and (recursively) finds TaskLocations.

If all the attempts fail to yield any non-empty result, getPreferredLocsInternal returns an empty collection of TaskLocations.

getPreferredLocsInternal is used when DAGScheduler is requested for the preferred locations for missing partitions.

Stopping DAGScheduler

stop(): Unit

stop stops the internal dag-scheduler-message thread pool, dag-scheduler-event-loop, and TaskScheduler.

stop is used when…​FIXME

Updating Accumulators with Partial Values from Completed Tasks

updateAccumulators(
  event: CompletionEvent): Unit

updateAccumulators merges the partial values of accumulators from a completed task into their "source" accumulators on the driver.

It is called by handleTaskCompletion.

For each AccumulableInfo in the CompletionEvent, a partial value from a task is obtained (from AccumulableInfo.update) and added to the driver’s accumulator (using Accumulable.++= method).

For named accumulators with the update value being a non-zero value, i.e. not Accumulable.zero:

  • stage.latestInfo.accumulables for the AccumulableInfo.id is set

  • CompletionEvent.taskInfo.accumulables has a new AccumulableInfo added.

FIXME Where are Stage.latestInfo.accumulables and CompletionEvent.taskInfo.accumulables used?

updateAccumulators is used when DAGScheduler is requested to handle a task completion.

checkBarrierStageWithNumSlots Method

checkBarrierStageWithNumSlots(
  rdd: RDD[_]): Unit

checkBarrierStageWithNumSlots…​FIXME

checkBarrierStageWithNumSlots is used when DAGScheduler is requested to create ShuffleMapStage and ResultStage stages.

Killing Task

killTaskAttempt(
  taskId: Long,
  interruptThread: Boolean,
  reason: String): Boolean

killTaskAttempt requests the TaskScheduler to kill a task.

killTaskAttempt is used when SparkContext is requested to kill a task.

cleanUpAfterSchedulerStop Method

cleanUpAfterSchedulerStop(): Unit

cleanUpAfterSchedulerStop…​FIXME

cleanUpAfterSchedulerStop is used when DAGSchedulerEventProcessLoop is requested to onStop.

removeExecutorAndUnregisterOutputs Method

removeExecutorAndUnregisterOutputs(
  execId: String,
  fileLost: Boolean,
  hostToUnregisterOutputs: Option[String],
  maybeEpoch: Option[Long] = None): Unit

removeExecutorAndUnregisterOutputs…​FIXME

removeExecutorAndUnregisterOutputs is used when DAGScheduler is requested to handle task completion (due to a fetch failure) and executor lost events.

markMapStageJobsAsFinished Method

markMapStageJobsAsFinished(
  shuffleStage: ShuffleMapStage): Unit

markMapStageJobsAsFinished…​FIXME

markMapStageJobsAsFinished is used when DAGScheduler is requested to submit missing tasks (of a ShuffleMapStage that has just been computed) and handle a task completion (of a ShuffleMapStage).

updateJobIdStageIdMaps Method

updateJobIdStageIdMaps(
  jobId: Int,
  stage: Stage): Unit

updateJobIdStageIdMaps…​FIXME

updateJobIdStageIdMaps is used when DAGScheduler is requested to create ShuffleMapStage and ResultStage stages.

executorHeartbeatReceived Method

executorHeartbeatReceived(
  execId: String,
                // (taskId, stageId, stageAttemptId, accumUpdates)
  accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
  blockManagerId: BlockManagerId): Boolean

executorHeartbeatReceived posts a SparkListenerExecutorMetricsUpdate (to listenerBus) and informs BlockManagerMaster that blockManagerId block manager is alive (by posting BlockManagerHeartbeat).

executorHeartbeatReceived is used when TaskSchedulerImpl is requested to handle an executor heartbeat.

postTaskEnd Method

postTaskEnd(
  event: CompletionEvent): Unit

postTaskEnd…​FIXME

postTaskEnd is used when DAGScheduler is requested to handle a task completion.

Event Handlers

AllJobsCancelled Event Handler

doCancelAllJobs(): Unit

doCancelAllJobs…​FIXME

doCancelAllJobs is used when DAGSchedulerEventProcessLoop is requested to handle an AllJobsCancelled event and onError.

BeginEvent Event Handler

handleBeginEvent(
  task: Task[_],
  taskInfo: TaskInfo): Unit

handleBeginEvent…​FIXME

handleBeginEvent is used when DAGSchedulerEventProcessLoop is requested to handle a BeginEvent event.

CompletionEvent Event Handler

handleTaskCompletion(
  event: CompletionEvent): Unit

handleTaskCompletion…​FIXME

handleTaskCompletion is used when DAGSchedulerEventProcessLoop is requested to handle a CompletionEvent event.

ExecutorAdded Event Handler

handleExecutorAdded(
  execId: String,
  host: String): Unit

handleExecutorAdded…​FIXME

handleExecutorAdded is used when DAGSchedulerEventProcessLoop is requested to handle an ExecutorAdded event.

ExecutorLost Event Handler

handleExecutorLost(
  execId: String,
  workerLost: Boolean): Unit

handleExecutorLost…​FIXME

handleExecutorLost is used when DAGSchedulerEventProcessLoop is requested to handle an ExecutorLost event.

GettingResultEvent Event Handler

handleGetTaskResult(
  taskInfo: TaskInfo): Unit

handleGetTaskResult…​FIXME

handleGetTaskResult is used when DAGSchedulerEventProcessLoop is requested to handle a GettingResultEvent event.

JobCancelled Event Handler

handleJobCancellation(
  jobId: Int,
  reason: Option[String]): Unit

handleJobCancellation…​FIXME

handleJobCancellation is used when DAGScheduler is requested to handle a JobCancelled event, doCancelAllJobs, handleJobGroupCancelled, handleStageCancellation.

JobGroupCancelled Event Handler

handleJobGroupCancelled(
  groupId: String): Unit

handleJobGroupCancelled…​FIXME

handleJobGroupCancelled is used when DAGScheduler is requested to handle JobGroupCancelled event.

JobSubmitted Event Handler

handleJobSubmitted(
  jobId: Int,
  finalRDD: RDD[_],
  func: (TaskContext, Iterator[_]) => _,
  partitions: Array[Int],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties): Unit

handleJobSubmitted creates a new ResultStage (as finalStage in the picture below) given the input finalRDD, func, partitions, jobId and callSite.

dagscheduler handleJobSubmitted
Figure 7. DAGScheduler.handleJobSubmitted Method

handleJobSubmitted creates an ActiveJob (with the input jobId, callSite, listener, properties, and the ResultStage).

FIXME Why is this clearing here so important?

You should see the following INFO messages in the logs:

Got job [id] ([callSite]) with [number] output partitions
Final stage: [stage] ([name])
Parents of final stage: [parents]
Missing parents: [missingStages]

handleJobSubmitted then registers the new job in jobIdToActiveJob and activeJobs internal registries, and with the final ResultStage.

ResultStage can only have one ActiveJob registered.

In the end, handleJobSubmitted posts SparkListenerJobStart message to LiveListenerBus and submits the stage.

handleJobSubmitted is used when DAGSchedulerEventProcessLoop is requested to handle a JobSubmitted event.

MapStageSubmitted Event Handler

handleMapStageSubmitted(
  jobId: Int,
  dependency: ShuffleDependency[_, _, _],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties): Unit

handleMapStageSubmitted…​FIXME

handleMapStageSubmitted is used when DAGSchedulerEventProcessLoop is requested to handle a MapStageSubmitted event.

ResubmitFailedStages Event Handler

resubmitFailedStages(): Unit

resubmitFailedStages…​FIXME

resubmitFailedStages is used when DAGSchedulerEventProcessLoop is requested to handle a ResubmitFailedStages event.

SpeculativeTaskSubmitted Event Handler

handleSpeculativeTaskSubmitted(): Unit

handleSpeculativeTaskSubmitted…​FIXME

handleSpeculativeTaskSubmitted is used when DAGSchedulerEventProcessLoop is requested to handle a SpeculativeTaskSubmitted event.

StageCancelled Event Handler

handleStageCancellation(): Unit

handleStageCancellation…​FIXME

handleStageCancellation is used when DAGSchedulerEventProcessLoop is requested to handle a StageCancelled event.

TaskSetFailed Event Handler

handleTaskSetFailed(): Unit

handleTaskSetFailed…​FIXME

handleTaskSetFailed is used when DAGSchedulerEventProcessLoop is requested to handle a TaskSetFailed event.

WorkerRemoved Event Handler

handleWorkerRemoved(
  workerId: String,
  host: String,
  message: String): Unit

handleWorkerRemoved…​FIXME

handleWorkerRemoved is used when DAGSchedulerEventProcessLoop is requested to handle a WorkerRemoved event.

Logging

Enable ALL logging level for org.apache.spark.scheduler.DAGScheduler logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.DAGScheduler=ALL

Refer to Logging.

Internal Properties

Name Description

failedEpoch

The lookup table of lost executors and the epoch of the event.

failedStages

Stages that failed due to fetch failures (when a task fails with FetchFailed exception).

jobIdToActiveJob

The lookup table of ActiveJobs per job id.

jobIdToStageIds

The lookup table of all stages per ActiveJob id

metricsSource

DAGSchedulerSource

nextJobId

The next job id counting from 0.

Used when DAGScheduler submits a job and a map stage, and runs an approximate job.

nextStageId

The next stage id counting from 0.

Used when DAGScheduler creates a shuffle map stage and a result stage. It is the key in stageIdToStage.

runningStages

The set of stages that are currently "running".

A stage is added when submitMissingTasks gets executed (without first checking if the stage has not already been added).

shuffleIdToMapStage

The lookup table of ShuffleMapStages per ShuffleDependency.

stageIdToStage

The lookup table for stages per their ids.

Used when DAGScheduler creates a shuffle map stage, creates a result stage, cleans up job state and independent stages, is informed that a task is started, a taskset has failed, a job is submitted (to compute a ResultStage), a map stage was submitted, a task has completed or a stage was cancelled, updates accumulators, aborts a stage and fails a job and independent stages.

waitingStages

The stages with parents to be computed