Skip to content

TaskSetManager

TaskSetManager is a Schedulable that manages scheduling the tasks of a TaskSet.

TaskSetManager

Creating Instance

TaskSetManager takes the following to be created:

TaskSetManager is created when:


While being created, TaskSetManager requests the current epoch from MapOutputTracker and sets it on all tasks in the taskset.

TaskSetManager prints out the following DEBUG to the logs:

Epoch for [taskSet]: [epoch]

TaskSetManager adds the tasks as pending execution (in reverse order from the highest partition to the lowest).

Number of Task Failures

TaskSetManager is given maxTaskFailures value that is how many times a single task can fail before the whole TaskSet is aborted.

Master URL Number of Task Failures
local 1
local-with-retries maxFailures
local-cluster spark.task.maxFailures
Cluster Manager spark.task.maxFailures

isBarrier

isBarrier: Boolean

isBarrier is enabled (true) when this TaskSetManager is created for a TaskSet with barrier tasks.


isBarrier is used when:

resourceOffer

resourceOffer(
  execId: String,
  host: String,
  maxLocality: TaskLocality.TaskLocality,
  taskCpus: Int = sched.CPUS_PER_TASK,
  taskResourceAssignments: Map[String, ResourceInformation] = Map.empty): (Option[TaskDescription], Boolean, Int)

resourceOffer determines allowed locality level for the given TaskLocality being anything but NO_PREF.

resourceOffer dequeueTask for the given execId and host, and the allowed locality level. This may or may not give a TaskDescription.

In the end, resourceOffer returns the TaskDescription, hasScheduleDelayReject, and the index of the dequeued task (if any).


resourceOffer returns a (None, false, -1) tuple when this TaskSetManager is isZombie or the offer (by the given host or execId) should be ignored (excluded).


resourceOffer is used when:

Locality Wait

getLocalityWait(
  level: TaskLocality.TaskLocality): Long

getLocalityWait is 0 for legacyLocalityWaitReset and isBarrier flags enabled.

getLocalityWait determines the value of locality wait based on the given TaskLocality.TaskLocality.

TaskLocality Configuration Property
PROCESS_LOCAL spark.locality.wait.process
NODE_LOCAL spark.locality.wait.node
RACK_LOCAL spark.locality.wait.rack

Unless the value has been determined, getLocalityWait defaults to 0.

Note

NO_PREF and ANY task localities have no locality wait.


getLocalityWait is used when:

spark.driver.maxResultSize

TaskSetManager uses spark.driver.maxResultSize configuration property to check available memory for more task results.

Recomputing Task Locality Preferences

recomputeLocality(): Unit

If zombie, recomputeLocality does nothing.

recomputeLocality recomputes myLocalityLevels, localityWaits and currentLocalityIndex internal registries.

recomputeLocality computes locality levels (for scheduled tasks) and saves the result in myLocalityLevels internal registry.

recomputeLocality computes localityWaits by determining the locality wait for every locality level in myLocalityLevels.

recomputeLocality computes currentLocalityIndex by getLocalityIndex with the previous locality level. If the current locality index is higher than the previous, recomputeLocality recalculates currentLocalityIndex.


recomputeLocality is used when:

Zombie

A TaskSetManager is a zombie when all tasks in a taskset have completed successfully (regardless of the number of task attempts), or if the taskset has been aborted.

While in zombie state, a TaskSetManager can launch no new tasks and responds with no TaskDescriptions to resourceOffers.

A TaskSetManager remains in the zombie state until all tasks have finished running, i.e. to continue to track and account for the running tasks.

Computing Locality Levels (for Scheduled Tasks)

computeValidLocalityLevels(): Array[TaskLocality.TaskLocality]

computeValidLocalityLevels computes valid locality levels for tasks that were registered in corresponding registries per locality level.

Note

TaskLocality is a locality preference of a task and can be the most localized PROCESS_LOCAL, NODE_LOCAL through NO_PREF and RACK_LOCAL to ANY.

For every pending task (in pendingTasks registry), computeValidLocalityLevels requests the TaskSchedulerImpl for acceptable TaskLocalityies:

computeValidLocalityLevels always registers ANY task locality level.

In the end, computeValidLocalityLevels prints out the following DEBUG message to the logs:

Valid locality levels for [taskSet]: [comma-separated levels]

computeValidLocalityLevels is used when:

executorAdded

executorAdded(): Unit

executorAdded recomputeLocality.


executorAdded is used when:

prepareLaunchingTask

prepareLaunchingTask(
  execId: String,
  host: String,
  index: Int,
  taskLocality: TaskLocality.Value,
  speculative: Boolean,
  taskCpus: Int,
  taskResourceAssignments: Map[String, ResourceInformation],
  launchTime: Long): TaskDescription
taskResourceAssignments

taskResourceAssignments are the resources that are passed in to resourceOffer.

prepareLaunchingTask...FIXME


prepareLaunchingTask is used when:

Serialized Task Size Threshold

TaskSetManager object defines TASK_SIZE_TO_WARN_KIB value as the threshold to warn a user if any stages contain a task that has a serialized size greater than 1000 kB.

DAGScheduler

DAGScheduler can print out the following WARN message to the logs when requested to submitMissingTasks:

Broadcasting large task binary with size [taskBinaryBytes] [siByteSuffix]

TaskSetManager

TaskSetManager can print out the following WARN message to the logs when requested to prepareLaunchingTask:

Stage [stageId] contains a task of very large size ([serializedTask] KiB).
The maximum recommended task size is 1000 KiB.

Demo

Enable DEBUG logging level for org.apache.spark.scheduler.TaskSchedulerImpl (or org.apache.spark.scheduler.cluster.YarnScheduler for YARN) and org.apache.spark.scheduler.TaskSetManager and execute the following two-stage job to see their low-level innerworkings.

A cluster manager is recommended since it gives more task localization choices (with YARN additionally supporting rack localization).

$ ./bin/spark-shell \
    --master yarn \
    --conf spark.ui.showConsoleProgress=false

// Keep # partitions low to keep # messages low

scala> sc.parallelize(0 to 9, 3).groupBy(_ % 3).count
INFO YarnScheduler: Adding task set 0.0 with 3 tasks
DEBUG TaskSetManager: Epoch for TaskSet 0.0: 0
DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: NO_PREF, ANY
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 0
INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.0.2.87, executor 1, partition 0, PROCESS_LOCAL, 7541 bytes)
INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.0.2.87, executor 2, partition 1, PROCESS_LOCAL, 7541 bytes)
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 1
INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 10.0.2.87, executor 1, partition 2, PROCESS_LOCAL, 7598 bytes)
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 1
DEBUG TaskSetManager: No tasks for locality level NO_PREF, so moving to locality level ANY
INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 518 ms on 10.0.2.87 (executor 1) (1/3)
INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 512 ms on 10.0.2.87 (executor 2) (2/3)
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 0
INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 51 ms on 10.0.2.87 (executor 1) (3/3)
INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
INFO YarnScheduler: Adding task set 1.0 with 3 tasks
DEBUG TaskSetManager: Epoch for TaskSet 1.0: 1
DEBUG TaskSetManager: Valid locality levels for TaskSet 1.0: NODE_LOCAL, RACK_LOCAL, ANY
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 0
INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, 10.0.2.87, executor 2, partition 0, NODE_LOCAL, 7348 bytes)
INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 4, 10.0.2.87, executor 1, partition 1, NODE_LOCAL, 7348 bytes)
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 1
INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 5, 10.0.2.87, executor 1, partition 2, NODE_LOCAL, 7348 bytes)
INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 4) in 130 ms on 10.0.2.87 (executor 1) (1/3)
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 1
DEBUG TaskSetManager: No tasks for locality level NODE_LOCAL, so moving to locality level RACK_LOCAL
DEBUG TaskSetManager: No tasks for locality level RACK_LOCAL, so moving to locality level ANY
INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 133 ms on 10.0.2.87 (executor 2) (2/3)
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 0
INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 5) in 21 ms on 10.0.2.87 (executor 1) (3/3)
INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
res0: Long = 3

Logging

Enable ALL logging level for org.apache.spark.scheduler.TaskSetManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.TaskSetManager=ALL

Refer to Logging