Skip to content


= Task

Task is the <> of smallest individual <> that <>.

[[contract]] .Task Contract [cols="1m,3",options="header",width="100%"] |=== | Method | Description

| runTask a| [[runTask]]

[source, scala]

runTask(context: TaskContext): T

Runs the task

Used exclusively when Task is requested to <>


Task is <> when DAGScheduler is requested to[submit missing tasks of a stage].

NOTE: Task is a Scala abstract class and cannot be <> directly. It is created indirectly for the <>.

.Tasks Are Runtime Representation of RDD Partitions image::spark-rdd-partitions-job-stage-tasks.png[align="center"]

[[creating-instance]] Task is described by the following:

  • [[stageId]] Stage ID
  • [[stageAttemptId]] Stage (execution) attempt ID
  • [[partitionId]] Partition ID
  • [[localProperties]] Local properties
  • [[serializedTaskMetrics]] Serialized[] (Array[Byte])
  • [[jobId]] Optional ID of the[ActiveJob] (default: None)
  • [[appId]] Optional ID of the Spark application (default: None)
  • [[appAttemptId]] Optional ID of the Spark application's (execution) attempt ID (default: None)
  • [[isBarrier]] isBarrier flag that is to say whether the task belongs to a barrier stage (default: false)

Task can be <> (possibly on <>).

Tasks are[launched on executors] and <TaskRunner starts>>.

In other words, a task is a computation on the records in a RDD partition in a stage of a RDD in a Spark job.

NOTE: In Scala Task is actually Task[T] in which T is the type of the result of a task (i.e. the type of the value computed).

[[implementations]] .Tasks [cols="1,3",options="header",width="100%"] |=== | Task | Description

|[ResultTask] | [[ResultTask]] Computes a[ResultStage] and gives the result back to the driver

|[ShuffleMapTask] | [[ShuffleMapTask]] Computes a[ShuffleMapStage]


In most cases, the last stage of a Spark job consists of one or more[ResultTasks], while earlier stages are[ShuffleMapTasks].

NOTE: It is possible to have one or more[ShuffleMapTasks] as part of the last stage.

A task can only belong to one stage and operate on a single partition. All tasks in a stage must be completed before the stages that follow can start.

Tasks are spawned one by one for each stage and partition.

== [[preferredLocations]] preferredLocations Method

[source, scala]

preferredLocations: Seq[TaskLocation] = Nil[TaskLocations] that represent preferred locations (executors) to execute the task on.

Empty by default and so no task location preferences are defined that says the task could be launched on any executor.

NOTE: Defined by the <>, i.e.[ShuffleMapTask] and[ResultTask].

NOTE: preferredLocations is used exclusively when TaskSetManager is requested to[register a task as pending execution] and[dequeueSpeculativeTask].

== [[run]] Running Task Thread -- run Final Method

[source, scala]

run( taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem): T

run[registers the task (identified as taskAttemptId) with the local BlockManager].

NOTE: run uses[SparkEnv to access the current BlockManager].

run[creates a TaskContextImpl] that in turn becomes the task's[TaskContext].

NOTE: run is a final method and so must not be overriden.

run checks <<_killed, _killed>> flag and, if enabled, <> (with interruptThread flag disabled).

run creates a Hadoop CallerContext and sets it.

run <>.

NOTE: This is the moment when the custom Task's <> is executed.

In the end, run[notifies TaskContextImpl that the task has completed] (regardless of the final outcome -- a success or a failure).

In case of any exceptions, run[notifies TaskContextImpl that the task has failed]. run[requests MemoryStore to release unroll memory for this task] (for both ON_HEAP and OFF_HEAP memory modes).

NOTE: run uses[SparkEnv to access the current BlockManager] that it uses to access[MemoryStore].

run[requests MemoryManager to notify any tasks waiting for execution memory to be freed to wake up and try to acquire memory again].

run[unsets the task's TaskContext].

NOTE: run uses[SparkEnv to access the current MemoryManager].

NOTE: run is used exclusively when TaskRunner is requested to[run] (when Executor is requested to[launch a task (on "Executor task launch worker" thread pool sometime in the future)]).

. The Task instance has just been deserialized from taskBytes that were sent over the wire to an executor. localProperties and[TaskMemoryManager] are already assigned.

== [[states]][[TaskState]] Task States

A task can be in one of the following states (as described by TaskState enumeration):

  • RUNNING when the task is being started.
  • FINISHED when the task finished with the serialized result.
  • FAILED when the task fails, e.g. when[FetchFailedException], CommitDeniedException or any Throwable occurs
  • KILLED when an executor kills a task.
  • LOST

States are the values of org.apache.spark.TaskState.

NOTE: Task status updates are sent from executors to the driver through[].

Task is finished when it is in one of FINISHED, FAILED, KILLED, LOST.

LOST and FAILED states are considered failures.

TIP: Task states correspond to[org.apache.mesos.Protos.TaskState].

== [[collectAccumulatorUpdates]] Collect Latest Values of (Internal and External) Accumulators -- collectAccumulatorUpdates Method

[source, scala]

collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulableInfo]

collectAccumulatorUpdates collects the latest values of internal and external accumulators from a task (and returns the values as a collection of[AccumulableInfo]).

Internally, collectAccumulatorUpdates[takes TaskMetrics].

NOTE: collectAccumulatorUpdates uses <> to access the task's TaskMetrics.

collectAccumulatorUpdates collects the latest values of:

  •[internal accumulators] whose current value is not the zero value and the RESULT_SIZE accumulator (regardless whether the value is its zero or not).

  •[external accumulators] when taskFailed is disabled (false) or which[should be included on failures].

collectAccumulatorUpdates returns an empty collection when <> is not initialized.

NOTE: collectAccumulatorUpdates is used when[TaskRunner runs a task] (and sends a task's final results back to the driver).

== [[kill]] Killing Task -- kill Method

[source, scala]

kill(interruptThread: Boolean)

kill marks the task to be killed, i.e. it sets the internal _killed flag to true.

kill calls[TaskContextImpl.markInterrupted] when context is set.

If interruptThread is enabled and the internal taskThread is available, kill interrupts it.

CAUTION: FIXME When could context and interruptThread not be set?

== [[internal-registries]] Internal Properties

.Task's Internal Properties (e.g. Registries, Counters and Flags) [cols="1m,3",options="header",width="100%"] |=== | Name | Description

| _executorDeserializeCpuTime | [[_executorDeserializeCpuTime]]

| _executorDeserializeTime | [[_executorDeserializeTime]]

| _reasonIfKilled | [[_reasonIfKilled]]

| _killed | [[_killed]]

| context | [[context]] <>

Set to be a <> or <> when the <> flag is enabled or not, respectively, when Task is requested to <>

| epoch | [[epoch]] Task epoch

Starts as -1

Set when TaskSetManager is[created] (to be the[epoch] of the MapOutputTrackerMaster)

| metrics | [[metrics]][]

Created lazily when <> from <>.

| taskMemoryManager | [[taskMemoryManager]][TaskMemoryManager] that manages the memory allocated by the task.

| taskThread | [[taskThread]]


Last update: 2020-10-06