TaskSetManager¶
TaskSetManager
is a Schedulable that manages scheduling the tasks of a TaskSet.
Creating Instance¶
TaskSetManager
takes the following to be created:
- TaskSchedulerImpl
- TaskSet
- Number of Task Failures
-
HealthTracker
-
Clock
TaskSetManager
is created when:
TaskSchedulerImpl
is requested to create a TaskSetManager
While being created, TaskSetManager
requests the current epoch from MapOutputTracker
and sets it on all tasks in the taskset.
Note
TaskSetManager
uses TaskSchedulerImpl to access the current MapOutputTracker
.
TaskSetManager
prints out the following DEBUG to the logs:
Epoch for [taskSet]: [epoch]
TaskSetManager
adds the tasks as pending execution (in reverse order from the highest partition to the lowest).
Number of Task Failures¶
TaskSetManager
is given maxTaskFailures
value that is how many times a single task can fail before the whole TaskSet is aborted.
Master URL | Number of Task Failures |
---|---|
local | 1 |
local-with-retries | maxFailures |
local-cluster | spark.task.maxFailures |
Cluster Manager | spark.task.maxFailures |
isBarrier¶
isBarrier: Boolean
isBarrier
is enabled (true
) when this TaskSetManager
is created for a TaskSet with barrier tasks.
isBarrier
is used when:
TaskSchedulerImpl
is requested to resourceOfferSingleTaskSet, resourceOffersTaskSetManager
is requested to resourceOffer, checkSpeculatableTasks, getLocalityWait
resourceOffer¶
resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality,
taskCpus: Int = sched.CPUS_PER_TASK,
taskResourceAssignments: Map[String, ResourceInformation] = Map.empty): (Option[TaskDescription], Boolean, Int)
resourceOffer
determines allowed locality level for the given TaskLocality
being anything but NO_PREF
.
resourceOffer
dequeueTask for the given execId
and host
, and the allowed locality level. This may or may not give a TaskDescription.
In the end, resourceOffer
returns the TaskDescription
, hasScheduleDelayReject
, and the index of the dequeued task (if any).
resourceOffer
returns a (None, false, -1)
tuple when this TaskSetManager
is isZombie or the offer (by the given host
or execId
) should be ignored (excluded).
resourceOffer
is used when:
TaskSchedulerImpl
is requested to resourceOfferSingleTaskSet
Locality Wait¶
getLocalityWait(
level: TaskLocality.TaskLocality): Long
getLocalityWait
is 0
for legacyLocalityWaitReset and isBarrier flags enabled.
getLocalityWait
determines the value of locality wait based on the given TaskLocality.TaskLocality
.
TaskLocality | Configuration Property |
---|---|
PROCESS_LOCAL | spark.locality.wait.process |
NODE_LOCAL | spark.locality.wait.node |
RACK_LOCAL | spark.locality.wait.rack |
Unless the value has been determined, getLocalityWait
defaults to 0
.
Note
NO_PREF
and ANY
task localities have no locality wait.
getLocalityWait
is used when:
TaskSetManager
is created and recomputes locality preferences
spark.driver.maxResultSize¶
TaskSetManager
uses spark.driver.maxResultSize configuration property to check available memory for more task results.
Recomputing Task Locality Preferences¶
recomputeLocality(): Unit
If zombie, recomputeLocality
does nothing.
recomputeLocality
recomputes myLocalityLevels, localityWaits and currentLocalityIndex internal registries.
recomputeLocality
computes locality levels (for scheduled tasks) and saves the result in myLocalityLevels internal registry.
recomputeLocality
computes localityWaits by determining the locality wait for every locality level in myLocalityLevels.
recomputeLocality
computes currentLocalityIndex by getLocalityIndex with the previous locality level. If the current locality index is higher than the previous, recomputeLocality
recalculates currentLocalityIndex.
recomputeLocality
is used when:
TaskSetManager
is notified about status change in executors (i.e., lost, decommissioned, added)
Zombie¶
A TaskSetManager
is a zombie when all tasks in a taskset have completed successfully (regardless of the number of task attempts), or if the taskset has been aborted.
While in zombie state, a TaskSetManager
can launch no new tasks and responds with no TaskDescription
s to resourceOffers.
A TaskSetManager
remains in the zombie state until all tasks have finished running, i.e. to continue to track and account for the running tasks.
Computing Locality Levels (for Scheduled Tasks)¶
computeValidLocalityLevels(): Array[TaskLocality.TaskLocality]
computeValidLocalityLevels
computes valid locality levels for tasks that were registered in corresponding registries per locality level.
Note
TaskLocality is a locality preference of a task and can be the most localized PROCESS_LOCAL
, NODE_LOCAL
through NO_PREF
and RACK_LOCAL
to ANY
.
For every pending task (in pendingTasks registry), computeValidLocalityLevels
requests the TaskSchedulerImpl for acceptable TaskLocality
ies:
- For every executor,
computeValidLocalityLevels
requests the TaskSchedulerImpl to isExecutorAlive and addsPROCESS_LOCAL
- For every host,
computeValidLocalityLevels
requests the TaskSchedulerImpl to hasExecutorsAliveOnHost and addsNODE_LOCAL
- For any pending tasks with no locality preference,
computeValidLocalityLevels
addsNO_PREF
- For every rack,
computeValidLocalityLevels
requests the TaskSchedulerImpl to hasHostAliveOnRack and addsRACK_LOCAL
computeValidLocalityLevels
always registers ANY
task locality level.
In the end, computeValidLocalityLevels
prints out the following DEBUG message to the logs:
Valid locality levels for [taskSet]: [comma-separated levels]
computeValidLocalityLevels
is used when:
TaskSetManager
is created and to recomputeLocality
executorAdded¶
executorAdded(): Unit
executorAdded
recomputeLocality.
executorAdded
is used when:
TaskSchedulerImpl
is requested to handle resource offers
prepareLaunchingTask¶
prepareLaunchingTask(
execId: String,
host: String,
index: Int,
taskLocality: TaskLocality.Value,
speculative: Boolean,
taskCpus: Int,
taskResourceAssignments: Map[String, ResourceInformation],
launchTime: Long): TaskDescription
taskResourceAssignments
taskResourceAssignments
are the resources that are passed in to resourceOffer.
prepareLaunchingTask
...FIXME
prepareLaunchingTask
is used when:
TaskSchedulerImpl
is requested to resourceOffersTaskSetManager
is requested to resourceOffers
Serialized Task Size Threshold¶
TaskSetManager
object defines TASK_SIZE_TO_WARN_KIB
value as the threshold to warn a user if any stages contain a task that has a serialized size greater than 1000
kB.
DAGScheduler¶
DAGScheduler
can print out the following WARN message to the logs when requested to submitMissingTasks:
Broadcasting large task binary with size [taskBinaryBytes] [siByteSuffix]
TaskSetManager¶
TaskSetManager
can print out the following WARN message to the logs when requested to prepareLaunchingTask:
Stage [stageId] contains a task of very large size ([serializedTask] KiB).
The maximum recommended task size is 1000 KiB.
Demo¶
Enable DEBUG
logging level for org.apache.spark.scheduler.TaskSchedulerImpl
(or org.apache.spark.scheduler.cluster.YarnScheduler
for YARN) and org.apache.spark.scheduler.TaskSetManager
and execute the following two-stage job to see their low-level innerworkings.
A cluster manager is recommended since it gives more task localization choices (with YARN additionally supporting rack localization).
$ ./bin/spark-shell \
--master yarn \
--conf spark.ui.showConsoleProgress=false
// Keep # partitions low to keep # messages low
scala> sc.parallelize(0 to 9, 3).groupBy(_ % 3).count
INFO YarnScheduler: Adding task set 0.0 with 3 tasks
DEBUG TaskSetManager: Epoch for TaskSet 0.0: 0
DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: NO_PREF, ANY
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 0
INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.0.2.87, executor 1, partition 0, PROCESS_LOCAL, 7541 bytes)
INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.0.2.87, executor 2, partition 1, PROCESS_LOCAL, 7541 bytes)
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 1
INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 10.0.2.87, executor 1, partition 2, PROCESS_LOCAL, 7598 bytes)
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 1
DEBUG TaskSetManager: No tasks for locality level NO_PREF, so moving to locality level ANY
INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 518 ms on 10.0.2.87 (executor 1) (1/3)
INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 512 ms on 10.0.2.87 (executor 2) (2/3)
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 0
INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 51 ms on 10.0.2.87 (executor 1) (3/3)
INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
INFO YarnScheduler: Adding task set 1.0 with 3 tasks
DEBUG TaskSetManager: Epoch for TaskSet 1.0: 1
DEBUG TaskSetManager: Valid locality levels for TaskSet 1.0: NODE_LOCAL, RACK_LOCAL, ANY
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 0
INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, 10.0.2.87, executor 2, partition 0, NODE_LOCAL, 7348 bytes)
INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 4, 10.0.2.87, executor 1, partition 1, NODE_LOCAL, 7348 bytes)
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 1
INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 5, 10.0.2.87, executor 1, partition 2, NODE_LOCAL, 7348 bytes)
INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 4) in 130 ms on 10.0.2.87 (executor 1) (1/3)
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 1
DEBUG TaskSetManager: No tasks for locality level NODE_LOCAL, so moving to locality level RACK_LOCAL
DEBUG TaskSetManager: No tasks for locality level RACK_LOCAL, so moving to locality level ANY
INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 133 ms on 10.0.2.87 (executor 2) (2/3)
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 0
INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 5) in 21 ms on 10.0.2.87 (executor 1) (3/3)
INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
res0: Long = 3
Logging¶
Enable ALL
logging level for org.apache.spark.scheduler.TaskSetManager
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.scheduler.TaskSetManager=ALL
Refer to Logging