DAGSchedulerEventProcessLoop is an event processing thread to handle DAGSchedulerEvents asynchronously and serially (one by one).

DAGSchedulerEventProcessLoop is registered under the name of dag-scheduler-event-loop.

The purpose of the DAGSchedulerEventProcessLoop is to have a separate thread to process events asynchronously alongside DAGScheduler.

When created, DAGSchedulerEventProcessLoop gets the reference to the owning DAGScheduler that it uses to call event handler methods on.

DAGSchedulerEventProcessLoop uses java.util.concurrent.LinkedBlockingDeque blocking deque that grows indefinitely (up to Integer.MAX_VALUE events).

Table 1. DAGSchedulerEvents and Event Handlers
DAGSchedulerEvent Event Handler Trigger


DAGScheduler was requested to cancel all running or waiting jobs.



TaskSetManager informs DAGScheduler that a task is starting (through taskStarted).


Posted exclusively when DAGScheduler is requested to taskEnded

Event handler: handleTaskCompletion

CompletionEvent holds the following:



DAGScheduler was informed (through executorAdded) that an executor was spun up on a host.



Posted to notify DAGScheduler that an executor was lost.

ExecutorLost conveys the following information:

1. execId

2. ExecutorLossReason

NOTE: The input filesLost for handleExecutorLost is enabled when ExecutorLossReason is SlaveLost with workerLost enabled (it is disabled by default).

NOTE: handleExecutorLost is also called when DAGScheduler is informed that a task has failed due to FetchFailed exception.


TaskSetManager informs DAGScheduler (through taskGettingResult) that a task has completed and results are being fetched remotely.



DAGScheduler was requested to cancel a job.



DAGScheduler was requested to cancel a job group.



Posted to inform DAGScheduler that SparkContext submitted a MapStage for execution (through submitMapStage).

MapStageSubmitted conveys the following information:

1. A job identifier (as jobId)

2. The ShuffleDependency

3. A CallSite (as callSite)

4. The JobListener to inform about the status of the stage.

5. Properties of the execution



DAGScheduler was informed that a task has failed due to FetchFailed exception.



DAGScheduler was requested to cancel a stage.



DAGScheduler was requested to cancel a TaskSet

GettingResultEvent Event and handleGetTaskResult Handler

GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent

GettingResultEvent is a DAGSchedulerEvent that triggers handleGetTaskResult (on a separate thread).

GettingResultEvent is posted to inform DAGScheduler (through taskGettingResult) that a task fetches results.

handleGetTaskResult Handler

handleGetTaskResult(taskInfo: TaskInfo): Unit

handleGetTaskResult merely posts SparkListenerTaskGettingResult (to LiveListenerBus Event Bus).

BeginEvent Event and handleBeginEvent Handler

BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent

BeginEvent is a DAGSchedulerEvent that triggers handleBeginEvent (on a separate thread).

BeginEvent is posted to inform DAGScheduler (through taskStarted) that a TaskSetManager starts a task.

JobGroupCancelled Event and handleJobGroupCancelled Handler

JobGroupCancelled(groupId: String) extends DAGSchedulerEvent

JobGroupCancelled is a DAGSchedulerEvent that triggers handleJobGroupCancelled (on a separate thread).

JobGroupCancelled is posted when DAGScheduler is informed (through cancelJobGroup) that SparkContext was requested to cancel a job group.

handleJobGroupCancelled Handler

handleJobGroupCancelled(groupId: String): Unit

handleJobGroupCancelled finds active jobs in a group and cancels them.

Internally, handleJobGroupCancelled computes all the active jobs (registered in the internal collection of active jobs) that have spark.jobGroup.id scheduling property set to groupId.

handleJobGroupCancelled then cancels every active job in the group one by one and the cancellation reason: "part of cancelled job group [groupId]".

Getting Notified that ShuffleDependency Was Submitted — handleMapStageSubmitted Handler

  jobId: Int,
  dependency: ShuffleDependency[_, _, _],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties): Unit
scheduler handlemapstagesubmitted
Figure 1. MapStageSubmitted Event Handling

handleMapStageSubmitted finds or creates a new ShuffleMapStage for the input ShuffleDependency and jobId.

handleMapStageSubmitted creates an ActiveJob (with the input jobId, callSite, listener and properties, and the ShuffleMapStage).

FIXME Why is this clearing here so important?

You should see the following INFO messages in the logs:

INFO DAGScheduler: Got map stage job [id] ([callSite]) with [number] output partitions
INFO DAGScheduler: Final stage: [stage] ([name])
INFO DAGScheduler: Parents of final stage: [parents]
INFO DAGScheduler: Missing parents: [missingStages]

handleMapStageSubmitted registers the new job in jobIdToActiveJob and activeJobs internal registries, and with the final ShuffleMapStage.

ShuffleMapStage can have multiple ActiveJobs registered.

In the end, handleMapStageSubmitted posts SparkListenerJobStart message to LiveListenerBus and submits the ShuffleMapStage.

In case the ShuffleMapStage could be available already, handleMapStageSubmitted marks the job finished.

DAGScheduler requests MapOutputTrackerMaster for statistics for ShuffleDependency that it uses for handleMapStageSubmitted.
MapOutputTrackerMaster is passed in when DAGScheduler is created.

When handleMapStageSubmitted could not find or create a ShuffleMapStage, you should see the following WARN message in the logs.

WARN Creating new stage failed due to exception - job: [id]

handleMapStageSubmitted notifies listener about the job failure and exits.

MapStageSubmitted event processing is very similar to JobSubmitted events.

The difference between handleMapStageSubmitted and handleJobSubmitted:

  • handleMapStageSubmitted has a ShuffleDependency among the input parameters while handleJobSubmitted has finalRDD, func, and partitions.

  • handleMapStageSubmitted initializes finalStage as getShuffleMapStage(dependency, jobId) while handleJobSubmitted as finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)

  • handleMapStageSubmitted INFO logs Got map stage job %s (%s) with %d output partitions with dependency.rdd.partitions.length while handleJobSubmitted does Got job %s (%s) with %d output partitions with partitions.length.

  • FIXME: Could the above be cut to ActiveJob.numPartitions?

  • handleMapStageSubmitted adds a new job with finalStage.addActiveJob(job) while handleJobSubmitted sets with finalStage.setActiveJob(job).

  • handleMapStageSubmitted checks if the final stage has already finished, tells the listener and removes it using the code:

    if (finalStage.isAvailable) {
      markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))

resubmitFailedStages Handler

resubmitFailedStages(): Unit

resubmitFailedStages iterates over the internal collection of failed stages and submits them.

resubmitFailedStages does nothing when there are no failed stages reported.

You should see the following INFO message in the logs:

INFO Resubmitting failed stages

resubmitFailedStages clears the internal cache of RDD partition locations first. It then makes a copy of the collection of failed stages so DAGScheduler can track failed stages afresh.

At this point DAGScheduler has no failed stages reported.

The previously-reported failed stages are sorted by the corresponding job ids in incremental order and resubmitted.

Getting Notified that Executor Is Lost — handleExecutorLost Handler

  execId: String,
  filesLost: Boolean,
  maybeEpoch: Option[Long] = None): Unit

handleExecutorLost checks whether the input optional maybeEpoch is defined and if not requests the current epoch from MapOutputTrackerMaster.

MapOutputTrackerMaster is passed in (as mapOutputTracker) when DAGScheduler is created.
FIXME When is maybeEpoch passed in?
dagscheduler handleExecutorLost
Figure 2. DAGScheduler.handleExecutorLost

Recurring ExecutorLost events lead to the following repeating DEBUG message in the logs:

DEBUG Additional executor lost message for [execId] (epoch [currentEpoch])
handleExecutorLost handler uses DAGScheduler’s `failedEpoch and FIXME internal registries.

Otherwise, when the executor execId is not in the list of executor lost or the executor failure’s epoch is smaller than the input maybeEpoch, the executor’s lost event is recorded in failedEpoch internal registry.

FIXME Describe the case above in simpler non-technical words. Perhaps change the order, too.

You should see the following INFO message in the logs:

INFO Executor lost: [execId] (epoch [epoch])
FIXME Review what’s filesLost.

handleExecutorLost exits unless the ExecutorLost event was for a map output fetch operation (and the input filesLost is true) or external shuffle service is not used.

In such a case, you should see the following INFO message in the logs:

INFO Shuffle files lost for executor: [execId] (epoch [epoch])

handleExecutorLost walks over all ShuffleMapStages in DAGScheduler’s shuffleToMapStage internal registry and do the following (in order):

  1. ShuffleMapStage.removeOutputsOnExecutor(execId) is called

  2. MapOutputTrackerMaster.registerMapOutputs(shuffleId is called.

handleJobCancellation Handler

handleJobCancellation(jobId: Int, reason: String = "")

handleJobCancellation first makes sure that the input jobId has been registered earlier (using jobIdToStageIds internal registry).

If the input jobId is not known to DAGScheduler, you should see the following DEBUG message in the logs:

DEBUG DAGScheduler: Trying to cancel unregistered job [jobId]

Otherwise, handleJobCancellation fails the active job and all independent stages (by looking up the active job using jobIdToActiveJob) with failure reason:

Job [jobId] cancelled [reason]

Getting Notified That Task Has Finished — handleTaskCompletion Handler

handleTaskCompletion(event: CompletionEvent): Unit
dagscheduler tasksetmanager
Figure 3. DAGScheduler and CompletionEvent
CompletionEvent holds contextual information about the completed task.
Table 2. CompletionEvent Properties
Property Description


Completed Task instance for a stage, partition and stage attempt.




Result of the task


Accumulators with…​FIXME



TaskMetrics can be empty when the task has failed.

handleTaskCompletion announces task completion application-wide (by posting a SparkListenerTaskEnd to LiveListenerBus).

handleTaskCompletion checks the stage of the task out in the stageIdToStage internal registry and if not found, it simply exits.

handleTaskCompletion branches off per TaskEndReason (as event.reason).

Table 3. handleTaskCompletion Branches per TaskEndReason
TaskEndReason Description


Acts according to the type of the task that completed, i.e. ShuffleMapTask and ResultTask.




Updates accumulators (with partial values from the task).


Does nothing


Does nothing


Does nothing


Does nothing


Does nothing

Handling Successful Task Completion

When a task has finished successfully (i.e. Success end reason), handleTaskCompletion marks the partition as no longer pending (i.e. the partition the task worked on is removed from pendingPartitions of the stage).

A Stage tracks its own pending partitions using pendingPartitions property.

handleTaskCompletion branches off given the type of the task that completed, i.e. ShuffleMapTask and ResultTask.

Handling Successful ResultTask Completion

For ResultTask, the stage is assumed a ResultStage.

handleTaskCompletion finds the ActiveJob associated with the ResultStage.

ResultStage tracks the optional ActiveJob as activeJob property. There could only be one active job for a ResultStage.

If there is no job for the ResultStage, you should see the following INFO message in the logs:

INFO DAGScheduler: Ignoring result from [task] because its job has finished

Otherwise, when the ResultStage has a ActiveJob, handleTaskCompletion checks the status of the partition output for the partition the ResultTask ran for.

ActiveJob tracks task completions in finished property with flags for every partition in a stage. When the flag for a partition is enabled (i.e. true), it is assumed that the partition has been computed (and no results from any ResultTask are expected and hence simply ignored).
FIXME Describe why could a partition has more ResultTask running.

handleTaskCompletion ignores the CompletionEvent when the partition has already been marked as completed for the stage and simply exits.

handleTaskCompletion updates accumulators.

The partition for the ActiveJob (of the ResultStage) is marked as computed and the number of partitions calculated increased.

ActiveJob tracks what partitions have already been computed and their number.

If the ActiveJob has finished (when the number of partitions computed is exactly the number of partitions in a stage) handleTaskCompletion does the following (in order):

  1. Marks ResultStage computed.

  2. Cleans up after ActiveJob and independent stages.

  3. Announces the job completion application-wide (by posting a SparkListenerJobEnd to LiveListenerBus).

A task succeeded notification holds the output index and the result.

When the notification throws an exception (because it runs user code), handleTaskCompletion notifies JobListener about the failure (wrapping it inside a SparkDriverExecutionException exception).

Handling Successful ShuffleMapTask Completion

For ShuffleMapTask, the stage is assumed a ShuffleMapStage.

handleTaskCompletion updates accumulators.

The task’s result is assumed MapStatus that knows the executor where the task has finished.

You should see the following DEBUG message in the logs:

DEBUG DAGScheduler: ShuffleMapTask finished on [execId]

If the executor is registered in failedEpoch internal registry and the epoch of the completed task is not greater than that of the executor (as in failedEpoch registry), you should see the following INFO message in the logs:

INFO DAGScheduler: Ignoring possibly bogus [task] completion from executor [executorId]

Otherwise, handleTaskCompletion registers the MapStatus result for the partition with the stage (of the completed task).

handleTaskCompletion does more processing only if the ShuffleMapStage is registered as still running (in runningStages internal registry) and the ShuffleMapStage stage has no pending partitions to compute.

The ShuffleMapStage is marked as finished.

You should see the following INFO messages in the logs:

INFO DAGScheduler: looking for newly runnable stages
INFO DAGScheduler: running: [runningStages]
INFO DAGScheduler: waiting: [waitingStages]
INFO DAGScheduler: failed: [failedStages]
A ShuffleMapStage stage is ready (aka available) when all partitions have shuffle outputs, i.e. when their tasks have completed.

If however the ShuffleMapStage is not ready, you should see the following INFO message in the logs:

INFO DAGScheduler: Resubmitting [shuffleStage] ([shuffleStage.name]) because some of its tasks had failed: [missingPartitions]

In the end, handleTaskCompletion submits the ShuffleMapStage for execution.

TaskEndReason: Resubmitted

For Resubmitted case, you should see the following INFO message in the logs:

INFO Resubmitted [task], so marking it as still running

The task (by task.partitionId) is added to the collection of pending partitions of the stage (using stage.pendingPartitions).

A stage knows how many partitions are yet to be calculated. A task knows about the partition id for which it was launched.

Task Failed with FetchFailed Exception — TaskEndReason: FetchFailed

  bmAddress: BlockManagerId,
  shuffleId: Int,
  mapId: Int,
  reduceId: Int,
  message: String)
extends TaskFailedReason
Table 4. FetchFailed Properties
Name Description




Used when…​


Used when…​


Used when…​


Used when…​

A task knows about the id of the stage it belongs to.

When FetchFailed happens, stageIdToStage is used to access the failed stage (using task.stageId and the task is available in event in handleTaskCompletion(event: CompletionEvent)). shuffleToMapStage is used to access the map stage (using shuffleId).

If failedStage.latestInfo.attemptId != task.stageAttemptId, you should see the following INFO in the logs:

INFO Ignoring fetch failure from [task] as it's from [failedStage] attempt [task.stageAttemptId] and there is a more recent attempt for that stage (attempt ID [failedStage.latestInfo.attemptId]) running
FIXME What does failedStage.latestInfo.attemptId != task.stageAttemptId mean?

And the case finishes. Otherwise, the case continues.

If the failed stage is in runningStages, the following INFO message shows in the logs:

INFO Marking [failedStage] ([failedStage.name]) as failed due to a fetch failure from [mapStage] ([mapStage.name])

markStageAsFinished(failedStage, Some(failureMessage)) is called.

FIXME What does markStageAsFinished do?

If the failed stage is not in runningStages, the following DEBUG message shows in the logs:

DEBUG Received fetch failure from [task], but its from [failedStage] which is no longer running

When disallowStageRetryForTest is set, abortStage(failedStage, "Fetch failure will not retry stage due to testing config", None) is called.

FIXME Describe disallowStageRetryForTest and abortStage.
[failedStage] ([name]) has failed the maximum allowable number of times: 4. Most recent failure reason: [failureMessage]

If there are no failed stages reported (DAGScheduler.failedStages is empty), the following INFO shows in the logs:

INFO Resubmitting [mapStage] ([mapStage.name]) and [failedStage] ([failedStage.name]) due to fetch failure

And the following code is executed:

  new Runnable {
    override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
FIXME What does the above code do?

For all the cases, the failed stage and map stages are both added to the internal registry of failed stages.

If mapId (in the FetchFailed object for the case) is provided, the map stage output is cleaned up (as it is broken) using mapStage.removeOutputLoc(mapId, bmAddress) and MapOutputTrackerMaster.unregisterMapOutput(shuffleId, mapId, bmAddress) methods.

FIXME What does mapStage.removeOutputLoc do?

If BlockManagerId (as bmAddress in the FetchFailed object) is defined, handleTaskCompletion notifies DAGScheduler that an executor was lost (with filesLost enabled and maybeEpoch from the Task that completed).