Skip to content

TaskRunner

TaskRunner is a thread of execution to run a task.

Executor creates and runs TaskRunner

Internal Class

TaskRunner is an internal class of Executor with full access to internal registries.

TaskRunner is a java.lang.Runnable so once a TaskRunner has completed execution it must not be restarted.

Creating Instance

TaskRunner takes the following to be created:

TaskRunner is created when:

PluginContainer

TaskRunner may be given a PluginContainer when created.

The PluginContainer is used when TaskRunner is requested to run (for the Task to run).

Demo

./bin/spark-shell --conf spark.driver.maxResultSize=1m
scala> println(sc.version)
3.0.1
val maxResultSize = sc.getConf.get("spark.driver.maxResultSize")
assert(maxResultSize == "1m")
val rddOver1m = sc.range(0, 1024 * 1024 + 10, 1)
scala> rddOver1m.collect
ERROR TaskSetManager: Total size of serialized results of 2 tasks (1030.8 KiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
ERROR TaskSetManager: Total size of serialized results of 3 tasks (1546.2 KiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
ERROR TaskSetManager: Total size of serialized results of 4 tasks (2.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
WARN TaskSetManager: Lost task 7.0 in stage 0.0 (TID 7, 192.168.68.105, executor driver): TaskKilled (Tasks result size has exceeded maxResultSize)
WARN TaskSetManager: Lost task 5.0 in stage 0.0 (TID 5, 192.168.68.105, executor driver): TaskKilled (Tasks result size has exceeded maxResultSize)
WARN TaskSetManager: Lost task 12.0 in stage 0.0 (TID 12, 192.168.68.105, executor driver): TaskKilled (Tasks result size has exceeded maxResultSize)
ERROR TaskSetManager: Total size of serialized results of 5 tasks (2.5 MiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
WARN TaskSetManager: Lost task 8.0 in stage 0.0 (TID 8, 192.168.68.105, executor driver): TaskKilled (Tasks result size has exceeded maxResultSize)
...
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 2 tasks (1030.8 KiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
  ...

Thread Name

TaskRunner uses the following thread name (with the taskId of the TaskDescription):

Executor task launch worker for task [taskId]

Running Task

run(): Unit

run is part of the java.lang.Runnable abstraction.

Initialization

run initializes the threadId internal registry as the current thread identifier (using Thread.getId).

run sets the name of the current thread of execution as the threadName.

run creates a TaskMemoryManager (for the current MemoryManager and taskId). run uses SparkEnv to access the current MemoryManager.

run starts tracking the time to deserialize a task and sets the current thread's context classloader.

run creates a closure Serializer. run uses SparkEnv to access the closure Serializer.

run prints out the following INFO message to the logs (with the taskName and taskId):

Running [taskName] (TID [taskId])

run notifies the ExecutorBackend that the status of the task has changed to RUNNING (for the taskId).

run computes the total amount of time this JVM process has spent in garbage collection.

run uses the addedFiles and addedJars (of the given TaskDescription) to update dependencies.

run takes the serializedTask of the given TaskDescription and requests the closure Serializer to deserialize the task. run sets the task internal reference to hold the deserialized task.

For non-local environments, run prints out the following DEBUG message to the logs before requesting the MapOutputTrackerWorker to update the epoch (using the epoch of the Task to be executed). run uses SparkEnv to access the MapOutputTrackerWorker.

Task [taskId]'s epoch is [epoch]

run requests the metricsPoller...FIXME

run records the current time as the task's start time (taskStartTimeNs).

run requests the Task to run (with taskAttemptId as taskId, attemptNumber from TaskDescription, and metricsSystem as the current MetricsSystem).

Note

run uses SparkEnv to access the MetricsSystem.

Note

The task runs inside a "monitored" block (try-finally block) to detect any memory and lock leaks after the task's run finishes regardless of the final outcome - the computed value or an exception thrown.

run creates a Serializer and requests it to serialize the task result (valueBytes).

Note

run uses SparkEnv to access the Serializer.

run updates the metrics of the Task executed.

run updates the metric counters in the ExecutorSource.

run requests the Task executed for accumulator updates and the ExecutorMetricsPoller for metric peaks.

Serialized Task Result

run creates a DirectTaskResult (with the serialized task result, the accumulator updates and the metric peaks) and requests the closure Serializer to serialize it.

Note

The serialized DirectTaskResult is a java.nio.ByteBuffer.

run selects between the DirectTaskResult and an IndirectTaskResult based on the size of the serialized task result (limit of this serializedDirectResult byte buffer):

  1. With the size above spark.driver.maxResultSize, run prints out the following WARN message to the logs and serializes an IndirectTaskResult with a TaskResultBlockId.

    Finished [taskName] (TID [taskId]). Result is larger than maxResultSize ([resultSize] > [maxResultSize]), dropping it.
    
  2. With the size above maxDirectResultSize, run creates an TaskResultBlockId and requests the BlockManager to store the task result locally (with MEMORY_AND_DISK_SER). run prints out the following INFO message to the logs and serializes an IndirectTaskResult with a TaskResultBlockId.

    Finished [taskName] (TID [taskId]). [resultSize] bytes result sent via BlockManager)
    
  3. run prints out the following INFO message to the logs and uses the DirectTaskResult created earlier.

    Finished [taskName] (TID [taskId]). [resultSize] bytes result sent to driver
    

Note

serializedResult is either a IndirectTaskResult (possibly with the block stored in BlockManager) or a DirectTaskResult.

Incrementing succeededTasks Counter

run requests the ExecutorSource to increment succeededTasks counter.

Marking Task Finished

run setTaskFinishedAndClearInterruptStatus.

Notifying ExecutorBackend that Task Finished

run notifies the ExecutorBackend that the status of the taskId has changed to FINISHED.

Note

ExecutorBackend is given when the TaskRunner is created.

Wrapping Up

In the end, regardless of the task's execution status (successful or failed), run removes the taskId from runningTasks registry.

In case a onTaskStart notification was sent out, run requests the ExecutorMetricsPoller to onTaskCompletion.

Exceptions

run handles certain exceptions.

Exception Type TaskState Serialized ByteBuffer
FetchFailedException FAILED TaskFailedReason
TaskKilledException KILLED TaskKilled
InterruptedException KILLED TaskKilled
CommitDeniedException FAILED TaskFailedReason
Throwable FAILED ExceptionFailure

FetchFailedException

When shuffle:FetchFailedException.md[FetchFailedException] is reported while running a task, run <>.

run shuffle:FetchFailedException.md#toTaskFailedReason[requests FetchFailedException for the TaskFailedReason], serializes it and ExecutorBackend.md#statusUpdate[notifies ExecutorBackend that the task has failed] (with <>, TaskState.FAILED, and a serialized reason).

NOTE: ExecutorBackend was specified when <>.

NOTE: run uses a closure serializer:Serializer.md[Serializer] to serialize the failure reason. The Serializer was created before run ran the task.

TaskKilledException

When TaskKilledException is reported while running a task, you should see the following INFO message in the logs:

Executor killed [taskName] (TID [taskId]), reason: [reason]

run then <> and ExecutorBackend.md#statusUpdate[notifies ExecutorBackend that the task has been killed] (with <>, TaskState.KILLED, and a serialized TaskKilled object).

InterruptedException (with Task Killed)

When InterruptedException is reported while running a task, and the task has been killed, you should see the following INFO message in the logs:

Executor interrupted and killed [taskName] (TID [taskId]), reason: [killReason]

run then <> and ExecutorBackend.md#statusUpdate[notifies ExecutorBackend that the task has been killed] (with <>, TaskState.KILLED, and a serialized TaskKilled object).

NOTE: The difference between this InterruptedException and <> is the INFO message in the logs.

CommitDeniedException

When CommitDeniedException is reported while running a task, run <> and ExecutorBackend.md#statusUpdate[notifies ExecutorBackend that the task has failed] (with <>, TaskState.FAILED, and a serialized TaskKilled object).

NOTE: The difference between this CommitDeniedException and <> is just the reason being sent to ExecutorBackend.

Throwable

When run catches a Throwable, you should see the following ERROR message in the logs (followed by the exception).

Exception in [taskName] (TID [taskId])

run then records the following task metrics (only when <> is available):

  • TaskMetrics.md#setExecutorRunTime[executorRunTime]
  • TaskMetrics.md#setJvmGCTime[jvmGCTime]

run then scheduler:Task.md#collectAccumulatorUpdates[collects the latest values of internal and external accumulators] (with taskFailed flag enabled to inform that the collection is for a failed task).

Otherwise, when <> is not available, the accumulator collection is empty.

run converts the task accumulators to collection of AccumulableInfo, creates a ExceptionFailure (with the accumulators), and serializer:Serializer.md#serialize[serializes them].

NOTE: run uses a closure serializer:Serializer.md[Serializer] to serialize the ExceptionFailure.

CAUTION: FIXME Why does run create new ExceptionFailure(t, accUpdates).withAccums(accums), i.e. accumulators occur twice in the object.

run <> and ExecutorBackend.md#statusUpdate[notifies ExecutorBackend that the task has failed] (with <>, TaskState.FAILED, and the serialized ExceptionFailure).

run may also trigger SparkUncaughtExceptionHandler.uncaughtException(t) if this is a fatal error.

NOTE: The difference between this most Throwable case and other FAILED cases (i.e. <> and <>) is just the serialized ExceptionFailure vs a reason being sent to ExecutorBackend, respectively.

collectAccumulatorsAndResetStatusOnFailure

collectAccumulatorsAndResetStatusOnFailure(
  taskStartTimeNs: Long)

collectAccumulatorsAndResetStatusOnFailure...FIXME

Killing Task

kill(
  interruptThread: Boolean,
  reason: String): Unit

kill marks the TaskRunner as <> and scheduler:Task.md#kill[kills the task] (if available and not <> already).

NOTE: kill passes the input interruptThread on to the task itself while killing it.

When executed, you should see the following INFO message in the logs:

Executor is trying to kill [taskName] (TID [taskId]), reason: [reason]

NOTE: <> flag is checked periodically in <> to stop executing the task. Once killed, the task will eventually stop.

Logging

Enable ALL logging level for org.apache.spark.executor.Executor logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.executor.Executor=ALL

Refer to Logging.

Internal Properties

finished Flag

finished flag says whether the <> has finished (true) or not (false)

Default: false

Enabled (true) after TaskRunner has been requested to <>

Used when TaskRunner is requested to <>

reasonIfKilled

Reason to <> (and avoid <>)

Default: (empty) (None)

startGCTime Timestamp

Timestamp (which is really the Executor.md#computeTotalGcTime[total amount of time this Executor JVM process has already spent in garbage collection]) that is used to mark the GC "zero" time (when <>) and then compute the JVM GC time metric when:

  • TaskRunner is requested to <> and <>

  • Executor is requested to Executor.md#reportHeartBeat[reportHeartBeat]

Task

Deserialized scheduler:Task.md[task] to execute

Used when:

  • TaskRunner is requested to <>, <>, <>, <>

  • Executor is requested to Executor.md#reportHeartBeat[reportHeartBeat]

Task Name

The name of the task (of the TaskDescription) that is used exclusively for <> purposes when TaskRunner is requested to <> and <> the task

Thread Id

Current thread ID

Default: -1

Set immediately when TaskRunner is requested to <> and used exclusively when TaskReaper is requested for the thread info of the current thread (aka thread dump)