Skip to content

TaskSchedulerImpl

TaskSchedulerImpl is a TaskScheduler that uses a SchedulerBackend to schedule tasks (for execution on a cluster manager).

When a Spark application starts (and so an instance of SparkContext is created) TaskSchedulerImpl with a SchedulerBackend and DAGScheduler are created and soon started.

TaskSchedulerImpl and Other Services

TaskSchedulerImpl generates tasks based on executor resource offers.

TaskSchedulerImpl can track racks per host and port (that however is only used with Hadoop YARN cluster manager).

Using spark.scheduler.mode configuration property you can select the scheduling policy.

TaskSchedulerImpl submits tasks using SchedulableBuilders.

Creating Instance

TaskSchedulerImpl takes the following to be created:

While being created, TaskSchedulerImpl sets schedulingMode to the value of spark.scheduler.mode configuration property.

Note

schedulingMode is part of the TaskScheduler abstraction.

TaskSchedulerImpl throws a SparkException for unrecognized scheduling mode:

Unrecognized spark.scheduler.mode: [schedulingModeConf]

In the end, TaskSchedulerImpl creates a TaskResultGetter.

TaskSchedulerImpl is created when:

  • SparkContext is requested for a TaskScheduler (for local and spark master URLs)
  • KubernetesClusterManager and MesosClusterManager are requested for a TaskScheduler

Maximum Number of Task Failures

TaskSchedulerImpl can be given the maximum number of task failures when created or default to spark.task.maxFailures configuration property.

The number of task failures is used when submitting tasks (to create a TaskSetManager).

spark.task.cpus

TaskSchedulerImpl uses spark.task.cpus configuration property for...FIXME

SchedulerBackend

backend: SchedulerBackend

TaskSchedulerImpl is given a SchedulerBackend when requested to initialize.

The lifecycle of the SchedulerBackend is tightly coupled to the lifecycle of the TaskSchedulerImpl:

TaskSchedulerImpl waits until the SchedulerBackend is ready before requesting it for the following:

Unique Identifier of Spark Application

applicationId(): String

applicationId is part of the TaskScheduler abstraction.

applicationId simply request the SchedulerBackend for the applicationId.

Cancelling All Tasks of Stage

cancelTasks(
  stageId: Int,
  interruptThread: Boolean): Unit

cancelTasks is part of the TaskScheduler abstraction.

cancelTasks cancels all tasks submitted for execution in a stage stageId.

cancelTasks is used when:

handleSuccessfulTask

handleSuccessfulTask(
  taskSetManager: TaskSetManager,
  tid: Long,
  taskResult: DirectTaskResult[_]): Unit

handleSuccessfulTask requests the given TaskSetManager to handleSuccessfulTask (with the given tid and taskResult).

handleSuccessfulTask is used when:

handleTaskGettingResult

handleTaskGettingResult(
  taskSetManager: TaskSetManager,
  tid: Long): Unit

handleTaskGettingResult requests the given TaskSetManager to handleTaskGettingResult.

handleTaskGettingResult is used when:

Initializing

initialize(
  backend: SchedulerBackend): Unit

initialize initializes the TaskSchedulerImpl with the given SchedulerBackend.

TaskSchedulerImpl initialization

initialize saves the given SchedulerBackend.

initialize then sets <Pool>> as an empty-named Pool.md[Pool] (passing in <>, initMinShare and initWeight as 0).

NOTE: <> and <> are a part of scheduler:TaskScheduler.md#contract[TaskScheduler Contract].

initialize sets <> (based on <>):

  • FIFOSchedulableBuilder.md[FIFOSchedulableBuilder] for FIFO scheduling mode
  • FairSchedulableBuilder.md[FairSchedulableBuilder] for FAIR scheduling mode

initialize SchedulableBuilder.md#buildPools[requests SchedulableBuilder to build pools].

CAUTION: FIXME Why are rootPool and schedulableBuilder created only now? What do they need that it is not available when TaskSchedulerImpl is created?

NOTE: initialize is called while SparkContext.md#createTaskScheduler[SparkContext is created and creates SchedulerBackend and TaskScheduler].

Starting TaskSchedulerImpl

start(): Unit

start starts the SchedulerBackend and the task-scheduler-speculation executor service.

Starting TaskSchedulerImpl in Spark Standalone

Handling Task Status Update

statusUpdate(
  tid: Long,
  state: TaskState,
  serializedData: ByteBuffer): Unit

statusUpdate finds TaskSetManager for the input tid task (in <>).

When state is LOST, statusUpdate...FIXME

NOTE: TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode.

When state is one of the scheduler:Task.md#states[finished states], i.e. FINISHED, FAILED, KILLED or LOST, statusUpdate <> for the input tid.

statusUpdate scheduler:TaskSetManager.md#removeRunningTask[requests TaskSetManager to unregister tid from running tasks].

statusUpdate requests <> to scheduler:TaskResultGetter.md#enqueueSuccessfulTask[schedule an asynchrounous task to deserialize the task result (and notify TaskSchedulerImpl back)] for tid in FINISHED state and scheduler:TaskResultGetter.md#enqueueFailedTask[schedule an asynchrounous task to deserialize TaskFailedReason (and notify TaskSchedulerImpl back)] for tid in the other finished states (i.e. FAILED, KILLED, LOST).

If a task is in LOST state, statusUpdate scheduler:DAGScheduler.md#executorLost[notifies DAGScheduler that the executor was lost] (with SlaveLost and the reason Task [tid] was lost, so marking the executor as lost as well.) and scheduler:SchedulerBackend.md#reviveOffers[requests SchedulerBackend to revive offers].

In case the TaskSetManager for tid could not be found (in <> registry), you should see the following ERROR message in the logs:

Ignoring update with state [state] for TID [tid] because its task set is gone (this is likely the result of receiving duplicate task finished status updates)

Any exception is caught and reported as ERROR message in the logs:

Exception in statusUpdate

CAUTION: FIXME image with scheduler backends calling TaskSchedulerImpl.statusUpdate.

statusUpdate is used when:

task-scheduler-speculation Scheduled Executor Service

speculationScheduler is a java.util.concurrent.ScheduledExecutorService with the name task-scheduler-speculation for Speculative Execution of Tasks.

When TaskSchedulerImpl is requested to start (in non-local run mode) with spark.speculation enabled, speculationScheduler is used to schedule checkSpeculatableTasks to execute periodically every spark.speculation.interval.

speculationScheduler is shut down when TaskSchedulerImpl is requested to stop.

Checking for Speculatable Tasks

checkSpeculatableTasks(): Unit

checkSpeculatableTasks requests rootPool to check for speculatable tasks (if they ran for more than 100 ms) and, if there any, requests scheduler:SchedulerBackend.md#reviveOffers[SchedulerBackend to revive offers].

NOTE: checkSpeculatableTasks is executed periodically as part of speculative-execution-of-tasks.md[].

Cleaning up After Removing Executor

removeExecutor(
  executorId: String,
  reason: ExecutorLossReason): Unit

removeExecutor removes the executorId executor from the following <>: <>, executorIdToHost, executorsByHost, and hostsByRack. If the affected hosts and racks are the last entries in executorsByHost and hostsByRack, appropriately, they are removed from the registries.

Unless reason is LossReasonPending, the executor is removed from executorIdToHost registry and Schedulable.md#executorLost[TaskSetManagers get notified].

NOTE: The internal removeExecutor is called as part of <> and scheduler:TaskScheduler.md#executorLost[executorLost].

Handling Nearly-Completed SparkContext Initialization

postStartHook(): Unit

postStartHook is part of the TaskScheduler abstraction.

postStartHook waits until a scheduler backend is ready.

Waiting Until SchedulerBackend is Ready

waitBackendReady(): Unit

waitBackendReady waits until the SchedulerBackend is ready. If it is, waitBackendReady returns immediately. Otherwise, waitBackendReady keeps checking every 100 milliseconds (hardcoded) or the <> is SparkContext.md#stopped[stopped].

Note

A SchedulerBackend is ready by default.

If the SparkContext happens to be stopped while waiting, waitBackendReady throws an IllegalStateException:

Spark context stopped while waiting for backend

Stopping TaskSchedulerImpl

stop(): Unit

stop stops all the internal services, i.e. <task-scheduler-speculation executor service>>, scheduler:SchedulerBackend.md[SchedulerBackend], scheduler:TaskResultGetter.md[TaskResultGetter], and <> timer.

Default Level of Parallelism

defaultParallelism(): Int

defaultParallelism is part of the TaskScheduler abstraction.

defaultParallelism requests the SchedulerBackend for the default level of parallelism.

Note

Default level of parallelism is a hint for sizing jobs that SparkContext uses to create RDDs with the right number of partitions unless specified explicitly.

Submitting Tasks (of TaskSet) for Execution

submitTasks(
  taskSet: TaskSet): Unit

submitTasks is part of the TaskScheduler abstraction.

In essence, submitTasks registers a new TaskSetManager (for the given TaskSet) and requests the SchedulerBackend to handle resource allocation offers (from the scheduling system).

TaskSchedulerImpl.submitTasks

Internally, submitTasks prints out the following INFO message to the logs:

Adding task set [id] with [length] tasks

submitTasks then <> (for the given TaskSet.md[TaskSet] and the <>).

submitTasks registers (adds) the TaskSetManager per TaskSet.md#stageId[stage] and TaskSet.md#stageAttemptId[stage attempt] IDs (of the TaskSet.md[TaskSet]) in the <> internal registry.

NOTE: <> internal registry tracks the TaskSetManager.md[TaskSetManagers] (that represent TaskSet.md[TaskSets]) per stage and stage attempts. In other words, there could be many TaskSetManagers for a single stage, each representing a unique stage attempt.

NOTE: Not only could a task be retried (cf. <>), but also a single stage.

submitTasks makes sure that there is exactly one active TaskSetManager (with different TaskSet) across all the managers (for the stage). Otherwise, submitTasks throws an IllegalStateException:

more than one active taskSet for stage [stage]: [TaskSet ids]

NOTE: TaskSetManager is considered active when it is not a zombie.

submitTasks requests the <> to SchedulableBuilder.md#addTaskSetManager[add the TaskSetManager to the schedulable pool].

NOTE: The TaskScheduler.md#rootPool[schedulable pool] can be a single flat linked queue (in FIFOSchedulableBuilder.md[FIFO scheduling mode]) or a hierarchy of pools of Schedulables (in FairSchedulableBuilder.md[FAIR scheduling mode]).

submitTasks <> to make sure that the requested resources (i.e. CPU and memory) are assigned to the Spark application for a <> (the very first time the Spark application is started per <> flag).

NOTE: The very first time (<> flag is false) in cluster mode only (i.e. isLocal of the TaskSchedulerImpl is false), starvationTimer is scheduled to execute after configuration-properties.md#spark.starvation.timeout[spark.starvation.timeout] to ensure that the requested resources, i.e. CPUs and memory, were assigned by a cluster manager.

NOTE: After the first configuration-properties.md#spark.starvation.timeout[spark.starvation.timeout] passes, the <> internal flag is true.

In the end, submitTasks requests the <> to scheduler:SchedulerBackend.md#reviveOffers[reviveOffers].

TIP: Use dag-scheduler-event-loop thread to step through the code in a debugger.

Scheduling Starvation Task

Every time the starvation timer thread is executed and hasLaunchedTask flag is false, the following WARN message is printed out to the logs:

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

Otherwise, when the hasLaunchedTask flag is true the timer thread cancels itself.

Creating TaskSetManager

createTaskSetManager(
  taskSet: TaskSet,
  maxTaskFailures: Int): TaskSetManager

createTaskSetManager creates a TaskSetManager (with this TaskSchedulerImpl, the given TaskSet and the maxTaskFailures).


createTaskSetManager is used when:

Notifying TaskSetManager that Task Failed

handleFailedTask(
  taskSetManager: TaskSetManager,
  tid: Long,
  taskState: TaskState,
  reason: TaskFailedReason): Unit

handleFailedTask scheduler:TaskSetManager.md#handleFailedTask[notifies taskSetManager that tid task has failed] and, only when scheduler:TaskSetManager.md#zombie-state[taskSetManager is not in zombie state] and tid is not in KILLED state, scheduler:SchedulerBackend.md#reviveOffers[requests SchedulerBackend to revive offers].

NOTE: handleFailedTask is called when scheduler:TaskResultGetter.md#enqueueSuccessfulTask[TaskResultGetter deserializes a TaskFailedReason] for a failed task.

taskSetFinished

taskSetFinished(
  manager: TaskSetManager): Unit

taskSetFinished looks all scheduler:TaskSet.md[TaskSet]s up by the stage id (in <> registry) and removes the stage attempt from them, possibly with removing the entire stage record from taskSetsByStageIdAndAttempt registry completely (if there are no other attempts registered).

TaskSchedulerImpl.taskSetFinished is called when all tasks are finished

taskSetFinished then removes manager from the parent's schedulable pool.

You should see the following INFO message in the logs:

Removed TaskSet [id], whose tasks have all completed, from pool [name]

taskSetFinished is used when:

Notifying DAGScheduler About New Executor

executorAdded(
  execId: String,
  host: String)

executorAdded just DAGScheduler.md#executorAdded[notifies DAGScheduler that an executor was added].

NOTE: executorAdded uses <> that was given when <>.

Creating TaskDescriptions For Available Executor Resource Offers

resourceOffers(
  offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]]

resourceOffers takes the resources offers and generates a collection of tasks (as TaskDescriptions) to launch (given the resources available).

Note

A WorkerOffer represents a resource offer with CPU cores free to use on an executor.

Processing Executor Resource Offers


Internally, resourceOffers first updates <> and <> lookup tables to record new hosts and executors (given the input offers).

For new executors (not in <>) resourceOffers <DAGScheduler that an executor was added>>.

NOTE: TaskSchedulerImpl uses resourceOffers to track active executors.

CAUTION: FIXME a picture with executorAdded call from TaskSchedulerImpl to DAGScheduler.

resourceOffers requests BlacklistTracker to applyBlacklistTimeout and filters out offers on blacklisted nodes and executors.

NOTE: resourceOffers uses the optional <> that was given when <>.

CAUTION: FIXME Expand on blacklisting

resourceOffers then randomly shuffles offers (to evenly distribute tasks across executors and avoid over-utilizing some executors) and initializes the local data structures tasks and availableCpus (as shown in the figure below).

Internal Structures of resourceOffers with 5 WorkerOffers (with 4, 2, 0, 3, 2 free cores)

resourceOffers Pool.md#getSortedTaskSetQueue[takes TaskSets in scheduling order] from scheduler:TaskScheduler.md#rootPool[top-level Schedulable Pool].

TaskSchedulerImpl Requesting TaskSets (as TaskSetManagers) from Root Pool

Note

rootPool is configured when <>.

rootPool is part of the scheduler:TaskScheduler.md#rootPool[TaskScheduler Contract] and exclusively managed by scheduler:SchedulableBuilder.md[SchedulableBuilders], i.e. scheduler:FIFOSchedulableBuilder.md[FIFOSchedulableBuilder] and scheduler:FairSchedulableBuilder.md[FairSchedulableBuilder] (that scheduler:SchedulableBuilder.md#addTaskSetManager[manage registering TaskSetManagers with the root pool]).

scheduler:TaskSetManager.md[TaskSetManager] manages execution of the tasks in a single scheduler:TaskSet.md[TaskSet] that represents a single scheduler:Stage.md[Stage].

For every TaskSetManager (in scheduling order), you should see the following DEBUG message in the logs:

parentName: [name], name: [name], runningTasks: [count]

Only if a new executor was added, resourceOffers scheduler:TaskSetManager.md#executorAdded[notifies every TaskSetManager about the change] (to recompute locality preferences).

resourceOffers then takes every TaskSetManager (in scheduling order) and offers them each node in increasing order of locality levels (per scheduler:TaskSetManager.md#computeValidLocalityLevels[TaskSetManager's valid locality levels]).

NOTE: A TaskSetManager scheduler:TaskSetManager.md#computeValidLocalityLevels[computes locality levels of the tasks] it manages.

For every TaskSetManager and the TaskSetManager's valid locality level, resourceOffers tries to <> as long as the TaskSetManager manages to launch a task (given the locality level).

If resourceOffers did not manage to offer resources to a TaskSetManager so it could launch any task, resourceOffers scheduler:TaskSetManager.md#abortIfCompletelyBlacklisted[requests the TaskSetManager to abort the TaskSet if completely blacklisted].

When resourceOffers managed to launch a task, the internal <> flag gets enabled (that effectively means what the name says "there were executors and I managed to launch a task").


resourceOffers is used when:

maybeInitBarrierCoordinator

maybeInitBarrierCoordinator(): Unit

Unless a BarrierCoordinator has already been registered, maybeInitBarrierCoordinator creates a BarrierCoordinator and registers it to be known as barrierSync.

In the end, maybeInitBarrierCoordinator prints out the following INFO message to the logs:

Registered BarrierCoordinator endpoint

Finding Tasks from TaskSetManager to Schedule on Executors

resourceOfferSingleTaskSet(
  taskSet: TaskSetManager,
  maxLocality: TaskLocality,
  shuffledOffers: Seq[WorkerOffer],
  availableCpus: Array[Int],
  availableResources: Array[Map[String, Buffer[String]]],
  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]): (Boolean, Option[TaskLocality])

resourceOfferSingleTaskSet takes every WorkerOffer (from the input shuffledOffers) and (only if the number of available CPU cores (using the input availableCpus) is at least configuration-properties.md#spark.task.cpus[spark.task.cpus]) scheduler:TaskSetManager.md#resourceOffer[requests TaskSetManager (as the input taskSet) to find a Task to execute (given the resource offer)] (as an executor, a host, and the input maxLocality).

resourceOfferSingleTaskSet adds the task to the input tasks collection.

resourceOfferSingleTaskSet records the task id and TaskSetManager in some registries.

resourceOfferSingleTaskSet decreases configuration-properties.md#spark.task.cpus[spark.task.cpus] from the input availableCpus (for the WorkerOffer).

resourceOfferSingleTaskSet returns whether a task was launched or not.

Note

resourceOfferSingleTaskSet asserts that the number of available CPU cores (in the input availableCpus per WorkerOffer) is at least 0.


If there is a TaskNotSerializableException, resourceOfferSingleTaskSet prints out the following ERROR in the logs:

Resource offer failed, task set [name] was not serializable

resourceOfferSingleTaskSet is used when:

Task Locality Preference

TaskLocality represents a task locality preference and can be one of the following (from the most localized to the widest):

  1. PROCESS_LOCAL
  2. NODE_LOCAL
  3. NO_PREF
  4. RACK_LOCAL
  5. ANY

WorkerOffer — Free CPU Cores on Executor

WorkerOffer(
  executorId: String,
  host: String,
  cores: Int)

WorkerOffer represents a resource offer with free CPU cores available on an executorId executor on a host.

workerRemoved

workerRemoved(
  workerId: String,
  host: String,
  message: String): Unit

workerRemoved is part of the TaskScheduler abstraction.

workerRemoved prints out the following INFO message to the logs:

Handle removed worker [workerId]: [message]

In the end, workerRemoved requests the DAGScheduler to workerRemoved.

calculateAvailableSlots

calculateAvailableSlots(
  scheduler: TaskSchedulerImpl,
  conf: SparkConf,
  rpId: Int,
  availableRPIds: Array[Int],
  availableCpus: Array[Int],
  availableResources: Array[Map[String, Int]]): Int

calculateAvailableSlots...FIXME


calculateAvailableSlots is used when:

Logging

Enable ALL logging level for org.apache.spark.scheduler.TaskSchedulerImpl logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.TaskSchedulerImpl.name = org.apache.spark.scheduler.TaskSchedulerImpl
logger.TaskSchedulerImpl.level = all

Refer to Logging.