Executor¶
Creating Instance¶
Executor
takes the following to be created:
- Executor ID
- Host name
- SparkEnv
- User-defined jars
- isLocal flag
-
UncaughtExceptionHandler
(default:SparkUncaughtExceptionHandler
) - Resources (
Map[String, ResourceInformation]
)
Executor
is created when:
CoarseGrainedExecutorBackend
is requested to handle a RegisteredExecutor message (after having registered with the driver)LocalEndpoint
is created
When Created¶
When created, Executor
prints out the following INFO messages to the logs:
Starting executor ID [executorId] on host [executorHostname]
(only for non-local modes) Executor
sets SparkUncaughtExceptionHandler
as the default handler invoked when a thread abruptly terminates due to an uncaught exception.
(only for non-local modes) Executor
requests the BlockManager to initialize (with the Spark application id of the SparkConf).
(only for non-local modes) Executor
requests the MetricsSystem to register the following metric sources:
- ExecutorSource
JVMCPUSource
- ExecutorMetricsSource
- ShuffleMetricsSource (of the BlockManager)
Executor
uses SparkEnv
to access the MetricsSystem and BlockManager.
Executor
creates a task class loader (optionally with REPL support) and requests the system Serializer
to use as the default classloader (for deserializing tasks).
Executor
starts sending heartbeats with the metrics of active tasks.
PluginContainer¶
Executor
creates a PluginContainer (with the SparkEnv and the resources).
The PluginContainer
is used to create a TaskRunner for launching a task.
The PluginContainer
is requested to shutdown in stop.
ExecutorSource¶
When created, Executor
creates an ExecutorSource (with the threadPool, the executorId and the schemes).
The ExecutorSource
is then registered with the application's MetricsSystem (in local and non-local modes) to report metrics.
The metrics are updated right after a TaskRunner has finished executing a task.
ExecutorMetricsSource¶
Executor
creates an ExecutorMetricsSource when created with the spark.metrics.executorMetricsSource.enabled enabled.
Executor
uses the ExecutorMetricsSource
to create the ExecutorMetricsPoller.
Executor
requests the ExecutorMetricsSource
to register immediately when created with the isLocal flag disabled.
ExecutorMetricsPoller¶
Executor
creates an ExecutorMetricsPoller when created with the following:
Executor
requests the ExecutorMetricsPoller
to start immediately when created and to stop when requested to stop.
TaskRunner
requests the ExecutorMetricsPoller
to onTaskStart and onTaskCompletion at the beginning and the end of run, respectively.
When requested to reportHeartBeat with pollOnHeartbeat enabled, Executor
requests the ExecutorMetricsPoller
to poll.
Fetching File and Jar Dependencies¶
updateDependencies(
newFiles: Map[String, Long],
newJars: Map[String, Long]): Unit
updateDependencies
fetches missing or outdated extra files (in the given newFiles
). For every name-timestamp pair that...FIXME..., updateDependencies
prints out the following INFO message to the logs:
Fetching [name] with timestamp [timestamp]
updateDependencies
fetches missing or outdated extra jars (in the given newJars
). For every name-timestamp pair that...FIXME..., updateDependencies
prints out the following INFO message to the logs:
Fetching [name] with timestamp [timestamp]
updateDependencies
fetches the file to the SparkFiles root directory.
updateDependencies
...FIXME
updateDependencies
is used when:
TaskRunner
is requested to start (and run a task)
spark.driver.maxResultSize¶
Executor
uses the spark.driver.maxResultSize for TaskRunner
when requested to run a task (and decide on a serialized task result).
Maximum Size of Direct Results¶
Executor
uses the minimum of spark.task.maxDirectResultSize and spark.rpc.message.maxSize when TaskRunner
is requested to run a task (and decide on the type of a serialized task result).
isLocal Flag¶
Executor
is given the isLocal
flag when created to indicate a non-local mode (whether the executor and the Spark application runs with local
or cluster-specific master URL).
isLocal
is disabled (false
) by default and is off explicitly when CoarseGrainedExecutorBackend
is requested to handle a RegisteredExecutor message.
isLocal
is enabled (true
) when LocalEndpoint
is created
spark.executor.userClassPathFirst¶
Executor
reads the value of the spark.executor.userClassPathFirst configuration property when created.
When enabled, Executor
uses ChildFirstURLClassLoader
(not MutableURLClassLoader
) when requested to createClassLoader (and addReplClassLoaderIfNeeded).
User-Defined Jars¶
Executor
is given user-defined jars when created. No jars are assumed by default.
The jars are specified using spark.executor.extraClassPath configuration property (via --user-class-path command-line option of CoarseGrainedExecutorBackend
).
Running Tasks Registry¶
runningTasks: Map[Long, TaskRunner]
Executor
tracks TaskRunners by task IDs.
HeartbeatReceiver RPC Endpoint Reference¶
When created, Executor
creates an RPC endpoint reference to HeartbeatReceiver (running on the driver).
Executor
uses the RPC endpoint reference when requested to reportHeartBeat.
Launching Task¶
launchTask(
context: ExecutorBackend,
taskDescription: TaskDescription): Unit
launchTask
creates a TaskRunner (with the given ExecutorBackend, the TaskDescription and the PluginContainer) and adds it to the runningTasks internal registry.
launchTask
requests the "Executor task launch worker" thread pool to execute the TaskRunner
(sometime in the future).
In case the decommissioned flag is enabled, launchTask
prints out the following ERROR message to the logs:
Launching a task while in decommissioned state.
launchTask
is used when:
CoarseGrainedExecutorBackend
is requested to handle a LaunchTask messageLocalEndpoint
RPC endpoint (of LocalSchedulerBackend) is requested to reviveOffers
Sending Heartbeats and Active Tasks Metrics¶
Executors keep sending metrics for active tasks to the driver every spark.executor.heartbeatInterval (defaults to 10s
with some random initial delay so the heartbeats from different executors do not pile up on the driver).
An executor sends heartbeats using the Heartbeat Sender Thread.
For each task in TaskRunner (in runningTasks internal registry), the task's metrics are computed and become part of the heartbeat (with accumulators).
A blocking Heartbeat message that holds the executor id, all accumulator updates (per task id), and BlockManagerId is sent to HeartbeatReceiver RPC endpoint.
If the response requests to re-register BlockManager, Executor
prints out the following INFO message to the logs:
Told to re-register on heartbeat
BlockManager
is requested to reregister.
The internal heartbeatFailures counter is reset.
If there are any issues with communicating with the driver, Executor
prints out the following WARN message to the logs:
Issue communicating with driver in heartbeater
The internal heartbeatFailures is incremented and checked to be less than the spark.executor.heartbeat.maxFailures. If the number is greater, the following ERROR is printed out to the logs:
Exit as unable to send heartbeats to driver more than [HEARTBEAT_MAX_FAILURES] times
The executor exits (using System.exit
and exit code 56).
Heartbeat Sender Thread¶
heartbeater
is a ScheduledThreadPoolExecutor
(Java) with a single thread.
The name of the thread pool is driver-heartbeater.
Executor task launch worker Thread Pool¶
When created, Executor
creates threadPool
daemon cached thread pool with the name Executor task launch worker-[ID] (with ID
being the task id).
The threadPool
thread pool is used for launching tasks.
Executor Memory¶
The amount of memory per executor is configured using spark.executor.memory configuration property. It sets the available memory equally for all executors per application.
You can find the value displayed as Memory per Node in the web UI of the standalone Master.
Heartbeating With Partial Metrics For Active Tasks To Driver¶
reportHeartBeat(): Unit
reportHeartBeat
collects TaskRunners for currently running tasks (active tasks) with their tasks deserialized (i.e. either ready for execution or already started).
TaskRunner has task deserialized when it runs the task.
For every running task, reportHeartBeat
takes the TaskMetrics and:
reportHeartBeat
then records the latest values of internal and external accumulators for every task.
Note
Internal accumulators are a task's metrics while external accumulators are a Spark application's accumulators that a user has created.
reportHeartBeat
sends a blocking Heartbeat message to the HeartbeatReceiver (on the driver). reportHeartBeat
uses the value of spark.executor.heartbeatInterval configuration property for the RPC timeout.
Note
A Heartbeat
message contains the executor identifier, the accumulator updates, and the identifier of the BlockManager.
If the response (from HeartbeatReceiver) is to re-register the BlockManager
, reportHeartBeat
prints out the following INFO message to the logs and requests the BlockManager
to re-register (which will register the blocks the BlockManager
manages with the driver).
Told to re-register on heartbeat
HeartbeatResponse
requests the BlockManager
to re-register when either TaskScheduler or HeartbeatReceiver know nothing about the executor.
When posting the Heartbeat
was successful, reportHeartBeat
resets heartbeatFailures internal counter.
In case of a non-fatal exception, you should see the following WARN message in the logs (followed by the stack trace).
Issue communicating with driver in heartbeater
Every failure reportHeartBeat
increments heartbeat failures up to spark.executor.heartbeat.maxFailures configuration property. When the heartbeat failures reaches the maximum, reportHeartBeat
prints out the following ERROR message to the logs and the executor terminates with the error code: 56
.
Exit as unable to send heartbeats to driver more than [HEARTBEAT_MAX_FAILURES] times
reportHeartBeat
is used when:
Executor
is requested to schedule reporting heartbeat and partial metrics for active tasks to the driver (that happens every spark.executor.heartbeatInterval).
spark.executor.heartbeat.maxFailures¶
Executor
uses spark.executor.heartbeat.maxFailures configuration property in reportHeartBeat.
Logging¶
Enable ALL
logging level for org.apache.spark.executor.Executor
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.executor.Executor=ALL
Refer to Logging.