When a Spark application starts (and so an instance of SparkContext.md#creating-instance[SparkContext is created]) TaskSchedulerImpl with a scheduler:SchedulerBackend.md[SchedulerBackend] and scheduler:DAGScheduler.md[DAGScheduler] are created and soon started.
TaskSchedulerImpl can <
Using configuration-properties.md#spark.scheduler.mode[spark.scheduler.mode] configuration property you can select the scheduler:spark-scheduler-SchedulingMode.md[scheduling policy].
[[CPUS_PER_TASK]] TaskSchedulerImpl uses configuration-properties.md#spark.task.cpus[spark.task.cpus] configuration property for...FIXME
TaskSchedulerImpl takes the following to be created:
- [[sc]] SparkContext.md
- [[isLocal]] isLocal flag for local and cluster run modes (default:
TaskSchedulerImpl initializes the <
TaskSchedulerImpl sets scheduler:TaskScheduler.md#schedulingMode[schedulingMode] to the value of configuration-properties.md#spark.scheduler.mode[spark.scheduler.mode] configuration property.
schedulingMode is part of scheduler:TaskScheduler.md#schedulingMode[TaskScheduler] contract.
Failure to set
schedulingMode results in a
Unrecognized spark.scheduler.mode: [schedulingModeConf]
Ultimately, TaskSchedulerImpl creates a scheduler:TaskResultGetter.md[TaskResultGetter].
== [[backend]] SchedulerBackend
TaskSchedulerImpl is assigned a scheduler:SchedulerBackend.md[SchedulerBackend] when requested to <
The lifecycle of the SchedulerBackend is tightly coupled to the lifecycle of the owning TaskSchedulerImpl:
> so is the scheduler:SchedulerBackend.md#start[SchedulerBackend]
>, so is the scheduler:SchedulerBackend.md#stop[SchedulerBackend]
scheduler:SchedulerBackend.md#reviveOffers[Reviving resource offers] when requested to <
>, < >, < >, < >, and < >
scheduler:SchedulerBackend.md#killTask[Killing tasks] when requested to <
> and < >
scheduler:SchedulerBackend.md#defaultParallelism[Default parallelism], <
> and < > when requested for the < >, scheduler:SchedulerBackend.md#applicationId[applicationId] and scheduler:SchedulerBackend.md#applicationAttemptId[applicationAttemptId], respectively
== [[applicationId]] Unique Identifier of Spark Application
applicationId is part of scheduler:TaskScheduler.md#applicationId[TaskScheduler] contract.
applicationId simply request the <
== [[executorHeartbeatReceived]] executorHeartbeatReceived Method
executorHeartbeatReceived( execId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean
executorHeartbeatReceived is part of the scheduler:TaskScheduler.md#executorHeartbeatReceived[TaskScheduler] contract.
== [[cancelTasks]] Cancelling All Tasks of Stage --
cancelTasks(stageId: Int, interruptThread: Boolean): Unit¶
cancelTasks is part of scheduler:TaskScheduler.md#contract[TaskScheduler contract].
cancelTasks cancels all tasks submitted for execution in a stage
cancelTasks is used exclusively when
DAGScheduler scheduler:DAGScheduler.md#failJobAndIndependentStages[cancels a stage].
handleSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, taskResult: DirectTaskResult[_]): Unit
handleSuccessfulTask simply scheduler:TaskSetManager.md#handleSuccessfulTask[forwards the call to the input
handleSuccessfulTask is called when scheduler:TaskResultGetter.md#enqueueSuccessfulTask[
TaskSchedulerGetter has managed to deserialize the task result of a task that finished successfully].
handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit¶
handleTaskGettingResult simply scheduler:TaskSetManager.md#handleTaskGettingResult[forwards the call to the
handleTaskGettingResult is used to inform that scheduler:TaskResultGetter.md#enqueueSuccessfulTask[
TaskResultGetter enqueues a successful task with
IndirectTaskResult task result (and so is about to fetch a remote block from a
== [[getRackForHost]] Tracking Racks per Hosts and Ports --
getRackForHost(value: String): Option[String]¶
getRackForHost is a method to know about the racks per hosts and ports. By default, it assumes that racks are unknown (i.e. the method returns
NOTE: It is overriden by the YARN-specific TaskScheduler spark-on-yarn:spark-yarn-yarnscheduler.md[YarnScheduler].
getRackForHost is currently used in two places:
> to track hosts per rack (using the < hostsByRack registry>>) while processing resource offers.
scheduler:TaskSetManager.md#addPendingTask[TaskSetManager.addPendingTask], scheduler:TaskSetManager.md#[TaskSetManager.dequeueTask], and scheduler:TaskSetManager.md#dequeueSpeculativeTask[TaskSetManager.dequeueSpeculativeTask]
== [[initialize]] Initializing --
initialize( backend: SchedulerBackend): Unit
initialize initializes TaskSchedulerImpl.
.TaskSchedulerImpl initialization image::TaskSchedulerImpl-initialize.png[align="center"]
initialize saves the input <
initialize then sets <
initialize sets <
- spark-scheduler-FIFOSchedulableBuilder.md[FIFOSchedulableBuilder] for
- spark-scheduler-FairSchedulableBuilder.md[FairSchedulableBuilder] for
SchedulableBuilder to build pools].
CAUTION: FIXME Why are
schedulableBuilder created only now? What do they need that it is not available when TaskSchedulerImpl is created?
initialize is called while SparkContext.md#createTaskScheduler[SparkContext is created and creates SchedulerBackend and
== [[start]] Starting TaskSchedulerImpl
start starts the scheduler:SchedulerBackend.md[scheduler backend].
.Starting TaskSchedulerImpl in Spark Standalone image::taskschedulerimpl-start-standalone.png[align="center"]
start also starts <
Handling Task Status Update¶
statusUpdate( tid: Long, state: TaskState, serializedData: ByteBuffer): Unit
statusUpdate finds TaskSetManager for the input
tid task (in <
TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode.
state is one of the scheduler:Task.md#states[finished states], i.e.
TaskSetManager to unregister
tid from running tasks].
statusUpdate requests <
FINISHED state and scheduler:TaskResultGetter.md#enqueueFailedTask[schedule an asynchrounous task to deserialize
TaskFailedReason (and notify TaskSchedulerImpl back)] for
tid in the other finished states (i.e.
If a task is in
DAGScheduler that the executor was lost] (with
SlaveLost and the reason
Task [tid] was lost, so marking the executor as lost as well.) and scheduler:SchedulerBackend.md#reviveOffers[requests SchedulerBackend to revive offers].
In case the
tid could not be found (in <
Ignoring update with state [state] for TID [tid] because its task set is gone (this is likely the result of receiving duplicate task finished status updates)
Any exception is caught and reported as ERROR message in the logs:
Exception in statusUpdate
CAUTION: FIXME image with scheduler backends calling
statusUpdate is used when:
LocalEndpointis requested to handle a StatusUpdate message
== [[speculationScheduler]][[task-scheduler-speculation]] task-scheduler-speculation Scheduled Executor Service --
speculationScheduler Internal Attribute
speculationScheduler is a http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html[java.util.concurrent.ScheduledExecutorService] with the name task-scheduler-speculation for speculative-execution-of-tasks.md.
speculationScheduler is used to schedule <
speculationScheduler is shut down when <
== [[checkSpeculatableTasks]] Checking for Speculatable Tasks
rootPool to check for speculatable tasks (if they ran for more than
100 ms) and, if there any, requests scheduler:SchedulerBackend.md#reviveOffers[SchedulerBackend to revive offers].
checkSpeculatableTasks is executed periodically as part of speculative-execution-of-tasks.md.
== [[maxTaskFailures]] Acceptable Number of Task Failures
TaskSchedulerImpl can be given the acceptable number of task failures when created or defaults to configuration-properties.md#spark.task.maxFailures[spark.task.maxFailures] configuration property.
The number of task failures is used when <
== [[removeExecutor]] Cleaning up After Removing Executor --
removeExecutor Internal Method
removeExecutor(executorId: String, reason: ExecutorLossReason): Unit¶
removeExecutor removes the
executorId executor from the following <
hostsByRack. If the affected hosts and racks are the last entries in
hostsByRack, appropriately, they are removed from the registries.
LossReasonPending, the executor is removed from
executorIdToHost registry and spark-scheduler-Schedulable.md#executorLost[TaskSetManagers get notified].
NOTE: The internal
removeExecutor is called as part of <
== [[postStartHook]] Handling Nearly-Completed SparkContext Initialization --
postStartHook is part of the scheduler:TaskScheduler.md#postStartHook[TaskScheduler Contract] to notify a scheduler:TaskScheduler.md[task scheduler] that the
SparkContext (and hence the Spark application itself) is about to finish initialization.
postStartHook simply <
== [[stop]] Stopping TaskSchedulerImpl --
stop() stops all the internal services, i.e. <
== [[defaultParallelism]] Finding Default Level of Parallelism --
defaultParallelism is part of scheduler:TaskScheduler.md#defaultParallelism[TaskScheduler contract] as a hint for sizing jobs.
defaultParallelism simply requests <
NOTE: Default level of parallelism is a hint for sizing jobs that
SparkContext SparkContext.md#defaultParallelism[uses to create RDDs with the right number of partitions when not specified explicitly].
== [[submitTasks]] Submitting Tasks (of TaskSet) for Execution --
submitTasks(taskSet: TaskSet): Unit¶
submitTasks is part of the scheduler:TaskScheduler.md#submitTasks[TaskScheduler Contract] to submit the tasks (of the given scheduler:TaskSet.md[TaskSet]) for execution.
submitTasks registers a new scheduler:TaskSetManager.md[TaskSetManager] (for the given scheduler:TaskSet.md[TaskSet]) and requests the <
submitTasks first prints out the following INFO message to the logs:
Adding task set [id] with [length] tasks
submitTasks then <
submitTasks registers (adds) the
TaskSetManager per scheduler:TaskSet.md#stageId[stage] and scheduler:TaskSet.md#stageAttemptId[stage attempt] IDs (of the scheduler:TaskSet.md[TaskSet]) in the <
TaskSetManagers for a single stage, each representing a unique stage attempt.
NOTE: Not only could a task be retried (cf. <
submitTasks makes sure that there is exactly one active
TaskSetManager (with different
TaskSet) across all the managers (for the stage). Otherwise,
submitTasks throws an
more than one active taskSet for stage [stage]: [TaskSet ids]
TaskSetManager is considered active when it is not a zombie.
submitTasks requests the <
NOTE: The scheduler:TaskScheduler.md#rootPool[schedulable pool] can be a single flat linked queue (in spark-scheduler-FIFOSchedulableBuilder.md[FIFO scheduling mode]) or a hierarchy of pools of
Schedulables (in spark-scheduler-FairSchedulableBuilder.md[FAIR scheduling mode]).
NOTE: The very first time (<
false) in cluster mode only (i.e.
isLocal of the TaskSchedulerImpl is
starvationTimer is scheduled to execute after configuration-properties.md#spark.starvation.timeout[spark.starvation.timeout] to ensure that the requested resources, i.e. CPUs and memory, were assigned by a cluster manager.
NOTE: After the first configuration-properties.md#spark.starvation.timeout[spark.starvation.timeout] passes, the <
In the end,
submitTasks requests the <
dag-scheduler-event-loop thread to step through the code in a debugger.
=== [[submitTasks-starvationTimer]] Scheduling Starvation Task
Every time the starvation timer thread is executed and
hasLaunchedTask flag is
false, the following WARN message is printed out to the logs:
WARN Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
Otherwise, when the
hasLaunchedTask flag is
true the timer thread cancels itself.
== [[createTaskSetManager]] Creating TaskSetManager --
createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager¶
createTaskSetManager scheduler:TaskSetManager.md#creating-instance[creates a
TaskSetManager] (passing on the reference to TaskSchedulerImpl, the input
maxTaskFailures, and optional
createTaskSetManager uses the optional <
createTaskSetManager is used exclusively when <
== [[handleFailedTask]] Notifying TaskSetManager that Task Failed --
handleFailedTask( taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, reason: TaskFailedReason): Unit
tid task has failed] and, only when scheduler:TaskSetManager.md#zombie-state[
taskSetManager is not in zombie state] and
tid is not in
KILLED state, scheduler:SchedulerBackend.md#reviveOffers[requests SchedulerBackend to revive offers].
handleFailedTask is called when scheduler:TaskResultGetter.md#enqueueSuccessfulTask[
TaskResultGetter deserializes a
TaskFailedReason] for a failed task.
taskSetFinished(manager: TaskSetManager): Unit¶
taskSetFinished looks all scheduler:TaskSet.md[TaskSet]s up by the stage id (in <
taskSetsByStageIdAndAttempt registry completely (if there are no other attempts registered).
.TaskSchedulerImpl.taskSetFinished is called when all tasks are finished image::taskschedulerimpl-tasksetmanager-tasksetfinished.png[align="center"]
TaskSetManager manages a
TaskSet for a stage.
taskSetFinished then spark-scheduler-Pool.md#removeSchedulable[removes
manager from the parent's schedulable pool].
You should see the following INFO message in the logs:
Removed TaskSet [id], whose tasks have all completed, from pool [name]
taskSetFinished method is called when scheduler:TaskSetManager.md#maybeFinishTaskSet[
TaskSetManager has received the results of all the tasks in a
== [[executorAdded]] Notifying DAGScheduler About New Executor --
executorAdded(execId: String, host: String)¶
executorAdded just scheduler:DAGScheduler.md#executorAdded[notifies
DAGScheduler that an executor was added].
CAUTION: FIXME Image with a call from TaskSchedulerImpl to DAGScheduler, please.
executorAdded uses <
== [[waitBackendReady]] Waiting Until SchedulerBackend is Ready --
waitBackendReady Internal Method
waitBackendReady waits until the <
waitBackendReady returns immediately. Otherwise,
waitBackendReady keeps checking every
100 milliseconds (hardcoded) or the <
NOTE: A SchedulerBackend is scheduler:SchedulerBackend.md#isReady[ready] by default.
SparkContext happens to be stopped while waiting,
waitBackendReady throws an
Spark context stopped while waiting for backend
waitBackendReady is used exclusively when TaskSchedulerImpl is requested to <
== [[resourceOffers]] Creating TaskDescriptions For Available Executor Resource Offers
resourceOffers( offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]]
resourceOffers takes the resources
offers (as <
.Processing Executor Resource Offers image::taskscheduler-resourceOffers.png[align="center"]
resourceOffers first updates <
For new executors (not in <
NOTE: TaskSchedulerImpl uses
resourceOffers to track active executors.
CAUTION: FIXME a picture with
executorAdded call from TaskSchedulerImpl to DAGScheduler.
applyBlacklistTimeout and filters out offers on blacklisted nodes and executors.
resourceOffers uses the optional <
CAUTION: FIXME Expand on blacklisting
resourceOffers then randomly shuffles offers (to evenly distribute tasks across executors and avoid over-utilizing some executors) and initializes the local data structures
availableCpus (as shown in the figure below).
.Internal Structures of resourceOffers with 5 WorkerOffers (with 4, 2, 0, 3, 2 free cores) image::TaskSchedulerImpl-resourceOffers-internal-structures.png[align="center"]
TaskSets in scheduling order] from scheduler:TaskScheduler.md#rootPool[top-level Schedulable Pool].
.TaskSchedulerImpl Requesting TaskSets (as TaskSetManagers) from Root Pool image::TaskSchedulerImpl-resourceOffers-rootPool-getSortedTaskSetQueue.png[align="center"]
rootPool is configured when <
rootPool is part of the scheduler:TaskScheduler.md#rootPool[TaskScheduler Contract] and exclusively managed by scheduler:spark-scheduler-SchedulableBuilder.md[SchedulableBuilders], i.e. scheduler:spark-scheduler-FIFOSchedulableBuilder.md[FIFOSchedulableBuilder] and scheduler:spark-scheduler-FairSchedulableBuilder.md[FairSchedulableBuilder] (that scheduler:spark-scheduler-SchedulableBuilder.md#addTaskSetManager[manage registering TaskSetManagers with the root pool]).
scheduler:TaskSetManager.md[TaskSetManager] manages execution of the tasks in a single scheduler:TaskSet.md[TaskSet] that represents a single scheduler:Stage.md[Stage].¶
TaskSetManager (in scheduling order), you should see the following DEBUG message in the logs:
parentName: [name], name: [name], runningTasks: [count]
Only if a new executor was added,
resourceOffers scheduler:TaskSetManager.md#executorAdded[notifies every
TaskSetManager about the change] (to recompute locality preferences).
resourceOffers then takes every
TaskSetManager (in scheduling order) and offers them each node in increasing order of locality levels (per scheduler:TaskSetManager.md#computeValidLocalityLevels[TaskSetManager's valid locality levels]).
TaskSetManager scheduler:TaskSetManager.md##computeValidLocalityLevels[computes locality levels of the tasks] it manages.
TaskSetManager and the
TaskSetManager's valid locality level,
resourceOffers tries to <
TaskSetManager manages to launch a task (given the locality level).
resourceOffers did not manage to offer resources to a
TaskSetManager so it could launch any task,
resourceOffers scheduler:TaskSetManager.md#abortIfCompletelyBlacklisted[requests the
TaskSetManager to abort the
TaskSet if completely blacklisted].
resourceOffers managed to launch a task, the internal <
resourceOffers is used when:
DriverEndpointRPC endpoint) is requested to make executor resource offers
LocalEndpointis requested to revive resource offers
== [[resourceOfferSingleTaskSet]] Finding Tasks from TaskSetManager to Schedule on Executors --
resourceOfferSingleTaskSet Internal Method
resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: Seq[ArrayBuffer[TaskDescription]]): Boolean
resourceOfferSingleTaskSet takes every
WorkerOffer (from the input
shuffledOffers) and (only if the number of available CPU cores (using the input
availableCpus) is at least configuration-properties.md#spark.task.cpus[spark.task.cpus]) scheduler:TaskSetManager.md#resourceOffer[requests
TaskSetManager (as the input
taskSet) to find a
Task to execute (given the resource offer)] (as an executor, a host, and the input
resourceOfferSingleTaskSet adds the task to the input
resourceOfferSingleTaskSet records the task id and
TaskSetManager in the following registries:
resourceOfferSingleTaskSet decreases configuration-properties.md#spark.task.cpus[spark.task.cpus] from the input
availableCpus (for the
resourceOfferSingleTaskSet makes sure that the number of available CPU cores (in the input
WorkerOffer) is at least
If there is a
TaskNotSerializableException, you should see the following ERROR in the logs:
ERROR Resource offer failed, task set [name] was not serializable
resourceOfferSingleTaskSet returns whether a task was launched or not.
resourceOfferSingleTaskSet is used when TaskSchedulerImpl <
== [[TaskLocality]] TaskLocality -- Task Locality Preference
TaskLocality represents a task locality preference and can be one of the following (from most localized to the widest):
== [[WorkerOffer]] WorkerOffer -- Free CPU Cores on Executor
WorkerOffer(executorId: String, host: String, cores: Int)¶
WorkerOffer represents a resource offer with free CPU
cores available on an
executorId executor on a
== [[workerRemoved]] workerRemoved Method
workerRemoved( workerId: String, host: String, message: String): Unit
workerRemoved prints out the following INFO message to the logs:
Handle removed worker [workerId]: [message]
workerRemoved then requests the <
workerRemoved is part of the scheduler:TaskScheduler.md#workerRemoved[TaskScheduler] abstraction.
== [[maybeInitBarrierCoordinator]] maybeInitBarrierCoordinator Method
maybeInitBarrierCoordinator is used when TaskSchedulerImpl is requested to <
== [[logging]] Logging
ALL logging level for
org.apache.spark.scheduler.TaskSchedulerImpl logger to see what happens inside.
Add the following line to
Refer to spark-logging.md[Logging].
== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| dagScheduler a| [[dagScheduler]] scheduler:DAGScheduler.md[DAGScheduler]
| executorIdToHost a| [[executorIdToHost]] Lookup table of hosts per executor.
| executorIdToRunningTaskIds a| [[executorIdToRunningTaskIds]] Lookup table of running tasks per executor.
| executorIdToTaskCount a| [[executorIdToTaskCount]] Lookup table of the number of running tasks by executor:Executor.md.
| executorsByHost a| [[executorsByHost]] Collection of executor:Executor.md[executors] per host
| hasLaunchedTask a| [[hasLaunchedTask]] Flag...FIXME
| hostToExecutors a| [[hostToExecutors]] Lookup table of executors per hosts in a cluster.
| hostsByRack a| [[hostsByRack]] Lookup table of hosts per rack.
| nextTaskId a| [[nextTaskId]] The next scheduler:Task.md[task] id counting from
Used when TaskSchedulerImpl...
| rootPool a| [[rootPool]] spark-scheduler-Pool.md[Schedulable pool]
Used when TaskSchedulerImpl...
| schedulableBuilder a| [[schedulableBuilder]] <
Created when TaskSchedulerImpl is requested to <
spark-scheduler-FIFOSchedulableBuilder.md[FIFOSchedulableBuilder] when scheduling policy is FIFO (which is the default scheduling policy).
spark-scheduler-FairSchedulableBuilder.md[FairSchedulableBuilder] for FAIR scheduling policy.
NOTE: Use configuration-properties.md#spark.scheduler.mode[spark.scheduler.mode] configuration property to select the scheduling policy.
| schedulingMode a| [[schedulingMode]] spark-scheduler-SchedulingMode.md[SchedulingMode]
Used when TaskSchedulerImpl...
| taskSetsByStageIdAndAttempt a| [[taskSetsByStageIdAndAttempt]] Lookup table of scheduler:TaskSet.md[TaskSet] by stage and attempt ids.
| taskIdToExecutorId a| [[taskIdToExecutorId]] Lookup table of executor:Executor.md by task id.
| taskIdToTaskSetManager a| [[taskIdToTaskSetManager]] Registry of active scheduler:TaskSetManager.md[TaskSetManagers] per task id.