TaskSchedulerImpl¶
TaskSchedulerImpl
is a TaskScheduler that uses a SchedulerBackend to schedule tasks (for execution on a cluster manager).
When a Spark application starts (and so an instance of SparkContext
is created) TaskSchedulerImpl
with a SchedulerBackend and DAGScheduler are created and soon started.
TaskSchedulerImpl
generates tasks based on executor resource offers.
TaskSchedulerImpl
can track racks per host and port (that however is only used with Hadoop YARN cluster manager).
Using spark.scheduler.mode configuration property you can select the scheduling policy.
TaskSchedulerImpl
submits tasks using SchedulableBuilders.
Creating Instance¶
TaskSchedulerImpl
takes the following to be created:
- SparkContext
- Maximum Number of Task Failures
-
isLocal
flag (default:false
) -
Clock
(default:SystemClock
)
While being created, TaskSchedulerImpl
sets schedulingMode to the value of spark.scheduler.mode configuration property.
Note
schedulingMode
is part of the TaskScheduler abstraction.
TaskSchedulerImpl
throws a SparkException
for unrecognized scheduling mode:
Unrecognized spark.scheduler.mode: [schedulingModeConf]
In the end, TaskSchedulerImpl
creates a TaskResultGetter.
TaskSchedulerImpl
is created when:
SparkContext
is requested for a TaskScheduler (forlocal
andspark
master URLs)KubernetesClusterManager
andMesosClusterManager
are requested for aTaskScheduler
Maximum Number of Task Failures¶
TaskSchedulerImpl
can be given the maximum number of task failures when created or default to spark.task.maxFailures configuration property.
The number of task failures is used when submitting tasks (to create a TaskSetManager).
spark.task.cpus¶
TaskSchedulerImpl
uses spark.task.cpus configuration property for...FIXME
SchedulerBackend¶
backend: SchedulerBackend
TaskSchedulerImpl
is given a SchedulerBackend when requested to initialize.
The lifecycle of the SchedulerBackend
is tightly coupled to the lifecycle of the TaskSchedulerImpl
:
TaskSchedulerImpl
waits until the SchedulerBackend is ready before requesting it for the following:
-
Reviving resource offers when requested to submitTasks, statusUpdate, handleFailedTask, checkSpeculatableTasks, and executorLost
-
Killing tasks when requested to killTaskAttempt and killAllTaskAttempts
-
Default parallelism, applicationId and applicationAttemptId when requested for the defaultParallelism, applicationId and applicationAttemptId, respectively
Unique Identifier of Spark Application¶
applicationId(): String
applicationId
is part of the TaskScheduler abstraction.
applicationId
simply request the SchedulerBackend for the applicationId.
Cancelling All Tasks of Stage¶
cancelTasks(
stageId: Int,
interruptThread: Boolean): Unit
cancelTasks
is part of the TaskScheduler abstraction.
cancelTasks
cancels all tasks submitted for execution in a stage stageId
.
cancelTasks
is used when:
DAGScheduler
is requested to failJobAndIndependentStages
handleSuccessfulTask¶
handleSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_]): Unit
handleSuccessfulTask
requests the given TaskSetManager to handleSuccessfulTask (with the given tid
and taskResult
).
handleSuccessfulTask
is used when:
TaskResultGetter
is requested to enqueueSuccessfulTask
handleTaskGettingResult¶
handleTaskGettingResult(
taskSetManager: TaskSetManager,
tid: Long): Unit
handleTaskGettingResult
requests the given TaskSetManager to handleTaskGettingResult.
handleTaskGettingResult
is used when:
TaskResultGetter
is requested to enqueueSuccessfulTask
Initializing¶
initialize(
backend: SchedulerBackend): Unit
initialize
initializes the TaskSchedulerImpl
with the given SchedulerBackend.
initialize
saves the given SchedulerBackend.
initialize
then sets <initMinShare
and initWeight
as 0
).
NOTE: <
initialize
sets <
- FIFOSchedulableBuilder.md[FIFOSchedulableBuilder] for
FIFO
scheduling mode - FairSchedulableBuilder.md[FairSchedulableBuilder] for
FAIR
scheduling mode
initialize
SchedulableBuilder.md#buildPools[requests SchedulableBuilder
to build pools].
CAUTION: FIXME Why are rootPool
and schedulableBuilder
created only now? What do they need that it is not available when TaskSchedulerImpl is created?
NOTE: initialize
is called while SparkContext.md#createTaskScheduler[SparkContext is created and creates SchedulerBackend and TaskScheduler
].
Starting TaskSchedulerImpl¶
start(): Unit
start
starts the SchedulerBackend and the task-scheduler-speculation executor service.
Handling Task Status Update¶
statusUpdate(
tid: Long,
state: TaskState,
serializedData: ByteBuffer): Unit
statusUpdate
finds TaskSetManager for the input tid
task (in <
When state
is LOST
, statusUpdate
...FIXME
NOTE: TaskState.LOST
is only used by the deprecated Mesos fine-grained scheduling mode.
When state
is one of the scheduler:Task.md#states[finished states], i.e. FINISHED
, FAILED
, KILLED
or LOST
, statusUpdate
<tid
.
statusUpdate
scheduler:TaskSetManager.md#removeRunningTask[requests TaskSetManager
to unregister tid
from running tasks].
statusUpdate
requests <tid
in FINISHED
state and scheduler:TaskResultGetter.md#enqueueFailedTask[schedule an asynchrounous task to deserialize TaskFailedReason
(and notify TaskSchedulerImpl back)] for tid
in the other finished states (i.e. FAILED
, KILLED
, LOST
).
If a task is in LOST
state, statusUpdate
scheduler:DAGScheduler.md#executorLost[notifies DAGScheduler
that the executor was lost] (with SlaveLost
and the reason Task [tid] was lost, so marking the executor as lost as well.
) and scheduler:SchedulerBackend.md#reviveOffers[requests SchedulerBackend to revive offers].
In case the TaskSetManager
for tid
could not be found (in <
Ignoring update with state [state] for TID [tid] because its task set is gone (this is likely the result of receiving duplicate task finished status updates)
Any exception is caught and reported as ERROR message in the logs:
Exception in statusUpdate
CAUTION: FIXME image with scheduler backends calling TaskSchedulerImpl.statusUpdate
.
statusUpdate
is used when:
-
DriverEndpoint
(of CoarseGrainedSchedulerBackend) is requested to handle a StatusUpdate message -
LocalEndpoint
is requested to handle a StatusUpdate message
task-scheduler-speculation Scheduled Executor Service¶
speculationScheduler
is a java.util.concurrent.ScheduledExecutorService with the name task-scheduler-speculation for Speculative Execution of Tasks.
When TaskSchedulerImpl
is requested to start (in non-local run mode) with spark.speculation enabled, speculationScheduler
is used to schedule checkSpeculatableTasks to execute periodically every spark.speculation.interval.
speculationScheduler
is shut down when TaskSchedulerImpl
is requested to stop.
Checking for Speculatable Tasks¶
checkSpeculatableTasks(): Unit
checkSpeculatableTasks
requests rootPool
to check for speculatable tasks (if they ran for more than 100
ms) and, if there any, requests scheduler:SchedulerBackend.md#reviveOffers[SchedulerBackend to revive offers].
NOTE: checkSpeculatableTasks
is executed periodically as part of speculative-execution-of-tasks.md[].
Cleaning up After Removing Executor¶
removeExecutor(
executorId: String,
reason: ExecutorLossReason): Unit
removeExecutor
removes the executorId
executor from the following <executorIdToHost
, executorsByHost
, and hostsByRack
. If the affected hosts and racks are the last entries in executorsByHost
and hostsByRack
, appropriately, they are removed from the registries.
Unless reason
is LossReasonPending
, the executor is removed from executorIdToHost
registry and Schedulable.md#executorLost[TaskSetManagers get notified].
NOTE: The internal removeExecutor
is called as part of <
Handling Nearly-Completed SparkContext Initialization¶
postStartHook(): Unit
postStartHook
is part of the TaskScheduler abstraction.
postStartHook
waits until a scheduler backend is ready.
Waiting Until SchedulerBackend is Ready¶
waitBackendReady(): Unit
waitBackendReady
waits until the SchedulerBackend is ready. If it is, waitBackendReady
returns immediately. Otherwise, waitBackendReady
keeps checking every 100
milliseconds (hardcoded) or the <
Note
A SchedulerBackend
is ready by default.
If the SparkContext
happens to be stopped while waiting, waitBackendReady
throws an IllegalStateException
:
Spark context stopped while waiting for backend
Stopping TaskSchedulerImpl¶
stop(): Unit
stop
stops all the internal services, i.e. <
Default Level of Parallelism¶
defaultParallelism(): Int
defaultParallelism
is part of the TaskScheduler abstraction.
defaultParallelism
requests the SchedulerBackend for the default level of parallelism.
Note
Default level of parallelism is a hint for sizing jobs that SparkContext
uses to create RDDs with the right number of partitions unless specified explicitly.
Submitting Tasks (of TaskSet) for Execution¶
submitTasks(
taskSet: TaskSet): Unit
submitTasks
is part of the TaskScheduler abstraction.
In essence, submitTasks
registers a new TaskSetManager (for the given TaskSet) and requests the SchedulerBackend to handle resource allocation offers (from the scheduling system).
Internally, submitTasks
prints out the following INFO message to the logs:
Adding task set [id] with [length] tasks
submitTasks
then <
submitTasks
registers (adds) the TaskSetManager
per TaskSet.md#stageId[stage] and TaskSet.md#stageAttemptId[stage attempt] IDs (of the TaskSet.md[TaskSet]) in the <
NOTE: <TaskSetManagers
for a single stage, each representing a unique stage attempt.
NOTE: Not only could a task be retried (cf. <
submitTasks
makes sure that there is exactly one active TaskSetManager
(with different TaskSet
) across all the managers (for the stage). Otherwise, submitTasks
throws an IllegalStateException
:
more than one active taskSet for stage [stage]: [TaskSet ids]
NOTE: TaskSetManager
is considered active when it is not a zombie.
submitTasks
requests the <
NOTE: The TaskScheduler.md#rootPool[schedulable pool] can be a single flat linked queue (in FIFOSchedulableBuilder.md[FIFO scheduling mode]) or a hierarchy of pools of Schedulables
(in FairSchedulableBuilder.md[FAIR scheduling mode]).
submitTasks
<
NOTE: The very first time (<false
) in cluster mode only (i.e. isLocal
of the TaskSchedulerImpl is false
), starvationTimer
is scheduled to execute after configuration-properties.md#spark.starvation.timeout[spark.starvation.timeout] to ensure that the requested resources, i.e. CPUs and memory, were assigned by a cluster manager.
NOTE: After the first configuration-properties.md#spark.starvation.timeout[spark.starvation.timeout] passes, the <true
.
In the end, submitTasks
requests the <
TIP: Use dag-scheduler-event-loop
thread to step through the code in a debugger.
Scheduling Starvation Task¶
Every time the starvation timer thread is executed and hasLaunchedTask
flag is false
, the following WARN message is printed out to the logs:
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
Otherwise, when the hasLaunchedTask
flag is true
the timer thread cancels itself.
Creating TaskSetManager¶
createTaskSetManager(
taskSet: TaskSet,
maxTaskFailures: Int): TaskSetManager
createTaskSetManager
creates a TaskSetManager (with this TaskSchedulerImpl
, the given TaskSet and the maxTaskFailures
).
createTaskSetManager
is used when:
TaskSchedulerImpl
is requested to submit a TaskSet
Notifying TaskSetManager that Task Failed¶
handleFailedTask(
taskSetManager: TaskSetManager,
tid: Long,
taskState: TaskState,
reason: TaskFailedReason): Unit
handleFailedTask
scheduler:TaskSetManager.md#handleFailedTask[notifies taskSetManager
that tid
task has failed] and, only when scheduler:TaskSetManager.md#zombie-state[taskSetManager
is not in zombie state] and tid
is not in KILLED
state, scheduler:SchedulerBackend.md#reviveOffers[requests SchedulerBackend to revive offers].
NOTE: handleFailedTask
is called when scheduler:TaskResultGetter.md#enqueueSuccessfulTask[TaskResultGetter
deserializes a TaskFailedReason
] for a failed task.
taskSetFinished¶
taskSetFinished(
manager: TaskSetManager): Unit
taskSetFinished
looks all scheduler:TaskSet.md[TaskSet]s up by the stage id (in <taskSetsByStageIdAndAttempt
registry completely (if there are no other attempts registered).
taskSetFinished
then removes manager
from the parent's schedulable pool.
You should see the following INFO message in the logs:
Removed TaskSet [id], whose tasks have all completed, from pool [name]
taskSetFinished
is used when:
TaskSetManager
is requested to maybeFinishTaskSet
Notifying DAGScheduler About New Executor¶
executorAdded(
execId: String,
host: String)
executorAdded
just DAGScheduler.md#executorAdded[notifies DAGScheduler
that an executor was added].
NOTE: executorAdded
uses <
Creating TaskDescriptions For Available Executor Resource Offers¶
resourceOffers(
offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]]
resourceOffers
takes the resources offers
and generates a collection of tasks (as TaskDescriptions) to launch (given the resources available).
Note
A WorkerOffer
represents a resource offer with CPU cores free to use on an executor.
Internally, resourceOffers
first updates <offers
).
For new executors (not in <resourceOffers
<
NOTE: TaskSchedulerImpl uses resourceOffers
to track active executors.
CAUTION: FIXME a picture with executorAdded
call from TaskSchedulerImpl to DAGScheduler.
resourceOffers
requests BlacklistTracker
to applyBlacklistTimeout
and filters out offers on blacklisted nodes and executors.
NOTE: resourceOffers
uses the optional <
CAUTION: FIXME Expand on blacklisting
resourceOffers
then randomly shuffles offers (to evenly distribute tasks across executors and avoid over-utilizing some executors) and initializes the local data structures tasks
and availableCpus
(as shown in the figure below).
resourceOffers
Pool.md#getSortedTaskSetQueue[takes TaskSets
in scheduling order] from scheduler:TaskScheduler.md#rootPool[top-level Schedulable Pool].
Note
rootPool
is configured when <
rootPool
is part of the scheduler:TaskScheduler.md#rootPool[TaskScheduler Contract] and exclusively managed by scheduler:SchedulableBuilder.md[SchedulableBuilders], i.e. scheduler:FIFOSchedulableBuilder.md[FIFOSchedulableBuilder] and scheduler:FairSchedulableBuilder.md[FairSchedulableBuilder] (that scheduler:SchedulableBuilder.md#addTaskSetManager[manage registering TaskSetManagers with the root pool]).
scheduler:TaskSetManager.md[TaskSetManager] manages execution of the tasks in a single scheduler:TaskSet.md[TaskSet] that represents a single scheduler:Stage.md[Stage].
For every TaskSetManager
(in scheduling order), you should see the following DEBUG message in the logs:
parentName: [name], name: [name], runningTasks: [count]
Only if a new executor was added, resourceOffers
scheduler:TaskSetManager.md#executorAdded[notifies every TaskSetManager
about the change] (to recompute locality preferences).
resourceOffers
then takes every TaskSetManager
(in scheduling order) and offers them each node in increasing order of locality levels (per scheduler:TaskSetManager.md#computeValidLocalityLevels[TaskSetManager's valid locality levels]).
NOTE: A TaskSetManager
scheduler:TaskSetManager.md#computeValidLocalityLevels[computes locality levels of the tasks] it manages.
For every TaskSetManager
and the TaskSetManager
's valid locality level, resourceOffers
tries to <TaskSetManager
manages to launch a task (given the locality level).
If resourceOffers
did not manage to offer resources to a TaskSetManager
so it could launch any task, resourceOffers
scheduler:TaskSetManager.md#abortIfCompletelyBlacklisted[requests the TaskSetManager
to abort the TaskSet
if completely blacklisted].
When resourceOffers
managed to launch a task, the internal <
resourceOffers
is used when:
CoarseGrainedSchedulerBackend
(via DriverEndpoint RPC endpoint) is requested to make executor resource offersLocalEndpoint
is requested to revive resource offers
maybeInitBarrierCoordinator¶
maybeInitBarrierCoordinator(): Unit
Unless a BarrierCoordinator has already been registered, maybeInitBarrierCoordinator
creates a BarrierCoordinator and registers it to be known as barrierSync.
In the end, maybeInitBarrierCoordinator
prints out the following INFO message to the logs:
Registered BarrierCoordinator endpoint
Finding Tasks from TaskSetManager to Schedule on Executors¶
resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
availableResources: Array[Map[String, Buffer[String]]],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]]): (Boolean, Option[TaskLocality])
resourceOfferSingleTaskSet
takes every WorkerOffer
(from the input shuffledOffers
) and (only if the number of available CPU cores (using the input availableCpus
) is at least configuration-properties.md#spark.task.cpus[spark.task.cpus]) scheduler:TaskSetManager.md#resourceOffer[requests TaskSetManager
(as the input taskSet
) to find a Task
to execute (given the resource offer)] (as an executor, a host, and the input maxLocality
).
resourceOfferSingleTaskSet
adds the task to the input tasks
collection.
resourceOfferSingleTaskSet
records the task id and TaskSetManager
in some registries.
resourceOfferSingleTaskSet
decreases configuration-properties.md#spark.task.cpus[spark.task.cpus] from the input availableCpus
(for the WorkerOffer
).
resourceOfferSingleTaskSet
returns whether a task was launched or not.
Note
resourceOfferSingleTaskSet
asserts that the number of available CPU cores (in the input availableCpus
per WorkerOffer
) is at least 0
.
If there is a TaskNotSerializableException
, resourceOfferSingleTaskSet
prints out the following ERROR in the logs:
Resource offer failed, task set [name] was not serializable
resourceOfferSingleTaskSet
is used when:
TaskSchedulerImpl
is requested to resourceOffers
Task Locality Preference¶
TaskLocality
represents a task locality preference and can be one of the following (from the most localized to the widest):
PROCESS_LOCAL
NODE_LOCAL
NO_PREF
RACK_LOCAL
ANY
WorkerOffer — Free CPU Cores on Executor¶
WorkerOffer(
executorId: String,
host: String,
cores: Int)
WorkerOffer
represents a resource offer with free CPU cores
available on an executorId
executor on a host
.
workerRemoved¶
workerRemoved(
workerId: String,
host: String,
message: String): Unit
workerRemoved
is part of the TaskScheduler abstraction.
workerRemoved
prints out the following INFO message to the logs:
Handle removed worker [workerId]: [message]
In the end, workerRemoved
requests the DAGScheduler to workerRemoved.
calculateAvailableSlots¶
calculateAvailableSlots(
scheduler: TaskSchedulerImpl,
conf: SparkConf,
rpId: Int,
availableRPIds: Array[Int],
availableCpus: Array[Int],
availableResources: Array[Map[String, Int]]): Int
calculateAvailableSlots
...FIXME
calculateAvailableSlots
is used when:
TaskSchedulerImpl
is requested for TaskDescriptions for the given executor resource offersCoarseGrainedSchedulerBackend
is requested for the maximum number of concurrent tasks
Logging¶
Enable ALL
logging level for org.apache.spark.scheduler.TaskSchedulerImpl
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.TaskSchedulerImpl.name = org.apache.spark.scheduler.TaskSchedulerImpl
logger.TaskSchedulerImpl.level = all
Refer to Logging.