TaskRunner¶
TaskRunner
is a thread of execution to run a task.
Internal Class
TaskRunner
is an internal class of Executor with full access to internal registries.
TaskRunner
is a java.lang.Runnable so once a TaskRunner has completed execution it must not be restarted.
Creating Instance¶
TaskRunner
takes the following to be created:
- ExecutorBackend (that manages the parent Executor)
- TaskDescription
- PluginContainer
TaskRunner
is created when:
Executor
is requested to launch a task
PluginContainer¶
TaskRunner
may be given a PluginContainer when created.
The PluginContainer
is used when TaskRunner
is requested to run (for the Task to run).
Demo¶
./bin/spark-shell --conf spark.driver.maxResultSize=1m
scala> println(sc.version)
3.0.1
val maxResultSize = sc.getConf.get("spark.driver.maxResultSize")
assert(maxResultSize == "1m")
val rddOver1m = sc.range(0, 1024 * 1024 + 10, 1)
scala> rddOver1m.collect
ERROR TaskSetManager: Total size of serialized results of 2 tasks (1030.8 KiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
ERROR TaskSetManager: Total size of serialized results of 3 tasks (1546.2 KiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
ERROR TaskSetManager: Total size of serialized results of 4 tasks (2.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
WARN TaskSetManager: Lost task 7.0 in stage 0.0 (TID 7, 192.168.68.105, executor driver): TaskKilled (Tasks result size has exceeded maxResultSize)
WARN TaskSetManager: Lost task 5.0 in stage 0.0 (TID 5, 192.168.68.105, executor driver): TaskKilled (Tasks result size has exceeded maxResultSize)
WARN TaskSetManager: Lost task 12.0 in stage 0.0 (TID 12, 192.168.68.105, executor driver): TaskKilled (Tasks result size has exceeded maxResultSize)
ERROR TaskSetManager: Total size of serialized results of 5 tasks (2.5 MiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
WARN TaskSetManager: Lost task 8.0 in stage 0.0 (TID 8, 192.168.68.105, executor driver): TaskKilled (Tasks result size has exceeded maxResultSize)
...
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 2 tasks (1030.8 KiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
...
Thread Name¶
TaskRunner
uses the following thread name (with the taskId of the TaskDescription):
Executor task launch worker for task [taskId]
Running Task¶
run(): Unit
run
is part of the java.lang.Runnable abstraction.
Initialization¶
run
initializes the threadId internal registry as the current thread identifier (using Thread.getId).
run
sets the name of the current thread of execution as the threadName.
run
creates a TaskMemoryManager (for the current MemoryManager and taskId). run
uses SparkEnv
to access the current MemoryManager.
run
starts tracking the time to deserialize a task and sets the current thread's context classloader.
run
creates a closure Serializer. run
uses SparkEnv
to access the closure Serializer.
run
prints out the following INFO message to the logs (with the taskName and taskId):
Running [taskName] (TID [taskId])
run
notifies the ExecutorBackend that the status of the task has changed to RUNNING
(for the taskId).
run
computes the total amount of time this JVM process has spent in garbage collection.
run
uses the addedFiles and addedJars (of the given TaskDescription) to update dependencies.
run
takes the serializedTask of the given TaskDescription and requests the closure Serializer
to deserialize the task. run
sets the task internal reference to hold the deserialized task.
For non-local environments, run
prints out the following DEBUG message to the logs before requesting the MapOutputTrackerWorker
to update the epoch (using the epoch of the Task to be executed). run
uses SparkEnv
to access the MapOutputTrackerWorker.
Task [taskId]'s epoch is [epoch]
run
requests the metricsPoller
...FIXME
run
records the current time as the task's start time (taskStartTimeNs
).
run
requests the Task to run (with taskAttemptId
as taskId, attemptNumber
from TaskDescription
, and metricsSystem
as the current MetricsSystem).
Note
run
uses SparkEnv
to access the MetricsSystem.
Note
The task runs inside a "monitored" block (try-finally
block) to detect any memory and lock leaks after the task's run finishes regardless of the final outcome - the computed value or an exception thrown.
run
creates a Serializer and requests it to serialize the task result (valueBytes
).
Note
run
uses SparkEnv
to access the Serializer.
run
updates the metrics of the Task executed.
run
updates the metric counters in the ExecutorSource.
run
requests the Task executed for accumulator updates and the ExecutorMetricsPoller for metric peaks.
Serialized Task Result¶
run
creates a DirectTaskResult (with the serialized task result, the accumulator updates and the metric peaks) and requests the closure Serializer to serialize it.
Note
The serialized DirectTaskResult
is a java.nio.ByteBuffer.
run
selects between the DirectTaskResult
and an IndirectTaskResult based on the size of the serialized task result (limit of this serializedDirectResult
byte buffer):
-
With the size above spark.driver.maxResultSize,
run
prints out the following WARN message to the logs and serializes anIndirectTaskResult
with a TaskResultBlockId.Finished [taskName] (TID [taskId]). Result is larger than maxResultSize ([resultSize] > [maxResultSize]), dropping it.
-
With the size above maxDirectResultSize,
run
creates anTaskResultBlockId
and requests theBlockManager
to store the task result locally (withMEMORY_AND_DISK_SER
).run
prints out the following INFO message to the logs and serializes anIndirectTaskResult
with aTaskResultBlockId
.Finished [taskName] (TID [taskId]). [resultSize] bytes result sent via BlockManager)
-
run
prints out the following INFO message to the logs and uses theDirectTaskResult
created earlier.Finished [taskName] (TID [taskId]). [resultSize] bytes result sent to driver
Note
serializedResult
is either a IndirectTaskResult (possibly with the block stored in BlockManager
) or a DirectTaskResult.
Incrementing succeededTasks Counter¶
run
requests the ExecutorSource to increment succeededTasks
counter.
Marking Task Finished¶
run
setTaskFinishedAndClearInterruptStatus.
Notifying ExecutorBackend that Task Finished¶
run
notifies the ExecutorBackend that the status of the taskId has changed to FINISHED
.
Note
ExecutorBackend
is given when the TaskRunner is created.
Wrapping Up¶
In the end, regardless of the task's execution status (successful or failed), run
removes the taskId from runningTasks registry.
In case a onTaskStart notification was sent out, run
requests the ExecutorMetricsPoller to onTaskCompletion.
Exceptions¶
run
handles certain exceptions.
Exception Type | TaskState | Serialized ByteBuffer |
---|---|---|
FetchFailedException | FAILED | TaskFailedReason |
TaskKilledException | KILLED | TaskKilled |
InterruptedException | KILLED | TaskKilled |
CommitDeniedException | FAILED | TaskFailedReason |
Throwable | FAILED | ExceptionFailure |
FetchFailedException¶
When shuffle:FetchFailedException.md[FetchFailedException] is reported while running a task, run <
run shuffle:FetchFailedException.md#toTaskFailedReason[requests FetchFailedException
for the TaskFailedReason
], serializes it and ExecutorBackend.md#statusUpdate[notifies ExecutorBackend
that the task has failed] (with <TaskState.FAILED
, and a serialized reason).
NOTE: ExecutorBackend
was specified when <
NOTE: run uses a closure serializer:Serializer.md[Serializer] to serialize the failure reason. The Serializer
was created before run ran the task.
TaskKilledException¶
When TaskKilledException
is reported while running a task, you should see the following INFO message in the logs:
Executor killed [taskName] (TID [taskId]), reason: [reason]
run then <ExecutorBackend
that the task has been killed] (with <TaskState.KILLED
, and a serialized TaskKilled
object).
InterruptedException (with Task Killed)¶
When InterruptedException
is reported while running a task, and the task has been killed, you should see the following INFO message in the logs:
Executor interrupted and killed [taskName] (TID [taskId]), reason: [killReason]
run then <ExecutorBackend
that the task has been killed] (with <TaskState.KILLED
, and a serialized TaskKilled
object).
NOTE: The difference between this InterruptedException
and <
CommitDeniedException¶
When CommitDeniedException
is reported while running a task, run <ExecutorBackend
that the task has failed] (with <TaskState.FAILED
, and a serialized TaskKilled
object).
NOTE: The difference between this CommitDeniedException
and <ExecutorBackend
.
Throwable¶
When run catches a Throwable
, you should see the following ERROR message in the logs (followed by the exception).
Exception in [taskName] (TID [taskId])
run then records the following task metrics (only when <
- TaskMetrics.md#setExecutorRunTime[executorRunTime]
- TaskMetrics.md#setJvmGCTime[jvmGCTime]
run then scheduler:Task.md#collectAccumulatorUpdates[collects the latest values of internal and external accumulators] (with taskFailed
flag enabled to inform that the collection is for a failed task).
Otherwise, when <
run converts the task accumulators to collection of AccumulableInfo
, creates a ExceptionFailure
(with the accumulators), and serializer:Serializer.md#serialize[serializes them].
NOTE: run uses a closure serializer:Serializer.md[Serializer] to serialize the ExceptionFailure
.
CAUTION: FIXME Why does run create new ExceptionFailure(t, accUpdates).withAccums(accums)
, i.e. accumulators occur twice in the object.
run <ExecutorBackend
that the task has failed] (with <TaskState.FAILED
, and the serialized ExceptionFailure
).
run may also trigger SparkUncaughtExceptionHandler.uncaughtException(t)
if this is a fatal error.
NOTE: The difference between this most Throwable
case and other FAILED
cases (i.e. <ExceptionFailure
vs a reason being sent to ExecutorBackend
, respectively.
collectAccumulatorsAndResetStatusOnFailure¶
collectAccumulatorsAndResetStatusOnFailure(
taskStartTimeNs: Long)
collectAccumulatorsAndResetStatusOnFailure
...FIXME
Killing Task¶
kill(
interruptThread: Boolean,
reason: String): Unit
kill
marks the TaskRunner as <
NOTE: kill
passes the input interruptThread
on to the task itself while killing it.
When executed, you should see the following INFO message in the logs:
Executor is trying to kill [taskName] (TID [taskId]), reason: [reason]
NOTE: <
Logging¶
Enable ALL
logging level for org.apache.spark.executor.Executor
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.executor.Executor=ALL
Refer to Logging.
Internal Properties¶
finished Flag¶
finished flag says whether the <true
) or not (false
)
Default: false
Enabled (true
) after TaskRunner has been requested to <
Used when TaskRunner is requested to <
reasonIfKilled¶
Reason to <
Default: (empty)
(None
)
startGCTime Timestamp¶
Timestamp (which is really the Executor.md#computeTotalGcTime[total amount of time this Executor JVM process has already spent in garbage collection]) that is used to mark the GC "zero" time (when <
-
TaskRunner is requested to <
> and < > -
Executor
is requested to Executor.md#reportHeartBeat[reportHeartBeat]
Task¶
Deserialized scheduler:Task.md[task] to execute
Used when:
-
TaskRunner is requested to <
>, < >, < >, < > -
Executor
is requested to Executor.md#reportHeartBeat[reportHeartBeat]
Task Name¶
The name of the task (of the TaskDescription) that is used exclusively for <
Thread Id¶
Current thread ID
Default: -1
Set immediately when TaskRunner is requested to <TaskReaper
is requested for the thread info of the current thread (aka thread dump)