TaskScheduler

TaskScheduler is an abstraction of Spark schedulers that can submit tasks for execution in a Spark application (per scheduling policy).

sparkstandalone sparkcontext taskscheduler schedulerbackend
Figure 1. TaskScheduler and SparkContext
TaskScheduler works closely with DAGScheduler that submits sets of tasks for execution (for every stage in a Spark job).

TaskScheduler can track the executors available in a Spark application using executorHeartbeatReceived and executorLost interceptors (that inform about active and lost executors, respectively).

Submitting Tasks for Execution

submitTasks(
  taskSet: TaskSet): Unit

Submits the tasks (of the given TaskSet) for execution.

Used when DAGScheduler is requested to submit missing tasks (of a stage).

Handling Executor Heartbeat

executorHeartbeatReceived(
  execId: String,
  accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
  blockManagerId: BlockManagerId): Boolean

Handles a heartbeat from an executor

Returns true when the execId executor is managed by the TaskScheduler. false indicates that the block manager (on the executor) should re-register.

Used when HeartbeatReceiver RPC endpoint is requested to handle a Heartbeat (with task metrics) from an executor

Killing Task

killTaskAttempt(
  taskId: Long,
  interruptThread: Boolean,
  reason: String): Boolean

Kills a task (attempt)

Used when DAGScheduler is requested to kill a task

workerRemoved Notification

workerRemoved(
  workerId: String,
  host: String,
  message: String): Unit

Used when DriverEndpoint is requested to handle a RemoveWorker event

Contract

Method Description

applicationAttemptId

applicationAttemptId(): Option[String]

Unique identifier of an (execution) attempt of the Spark application

Used when SparkContext is created

cancelTasks

cancelTasks(
  stageId: Int,
  interruptThread: Boolean): Unit

Cancels all the tasks of a given stage

Used when DAGScheduler is requested to failJobAndIndependentStages

defaultParallelism

defaultParallelism(): Int

Default level of parallelism

Used when SparkContext is requested for the default level of parallelism

executorLost

executorLost(
  executorId: String,
  reason: ExecutorLossReason): Unit

Handles an executor lost event

Used when:

  • HeartbeatReceiver RPC endpoint is requested to expireDeadHosts

  • DriverEndpoint RPC endpoint is requested to removes (forgets) and disables a malfunctioning executor (i.e. either lost or blacklisted for some reason)

  • Spark on Mesos' MesosFineGrainedSchedulerBackend is requested to recordSlaveLost

killAllTaskAttempts

killAllTaskAttempts(
  stageId: Int,
  interruptThread: Boolean,
  reason: String): Unit

Used when:

rootPool

rootPool: Pool

Top-level (root) schedulable pool

Used when:

schedulingMode

schedulingMode: SchedulingMode

Used when:

setDAGScheduler

setDAGScheduler(dagScheduler: DAGScheduler): Unit

Associates a DAGScheduler

Used when DAGScheduler is created

start

start(): Unit

Starts the TaskScheduler

Used when SparkContext is created

stop

stop(): Unit

Stops the TaskScheduler

Used when DAGScheduler is requested to stop

TaskSchedulers

TaskScheduler Description

TaskSchedulerImpl

Default Spark scheduler

YarnScheduler

TaskScheduler for client deploy mode in Spark on YARN

YarnClusterScheduler

TaskScheduler for cluster deploy mode in Spark on YARN

Lifecycle

A TaskScheduler is created while SparkContext is being created (by calling SparkContext.createTaskScheduler for a given master URL and deploy mode).

taskscheduler uses schedulerbackend
Figure 2. TaskScheduler uses SchedulerBackend to support different clusters

At this point in SparkContext’s lifecycle, the internal _taskScheduler points at the TaskScheduler (and it is "announced" by sending a blocking TaskSchedulerIsSet message to HeartbeatReceiver RPC endpoint).

The TaskScheduler is started right after the blocking TaskSchedulerIsSet message receives a response.

The application ID and the application’s attempt ID are set at this point (and SparkContext uses the application id to set spark.app.id Spark property, and configure SparkUI, and BlockManager).

FIXME The application id is described as "associated with the job." in TaskScheduler, but I think it is "associated with the application" and you can have many jobs per application.

Right before SparkContext is fully initialized, TaskScheduler.postStartHook is called.

The internal _taskScheduler is cleared (i.e. set to null) while SparkContext is being stopped.

FIXME If it is SparkContext to start a TaskScheduler, shouldn’t SparkContext stop it too? Why is this the way it is now?

Post-Start Initialization

postStartHook(): Unit

postStartHook does nothing by default, but allows custom implementations for some additional post-start initialization.

postStartHook is used when:

  • SparkContext is created (right before considered fully initialized)

  • Spark on YARN’s YarnClusterScheduler is requested to postStartHook

Unique Identifier of Spark Application

applicationId(): String

applicationId is the unique identifier of the Spark application and defaults to spark-application-[currentTimeMillis].

applicationId is used when SparkContext is created.