Executor

Executor is a distributed agent that is responsible for executing tasks.

Executor is created when:

Executor typically runs for the entire lifetime of a Spark application which is called static allocation of executors (but you could also opt in for dynamic allocation).

Executors are managed exclusively by executor backends.
spark HeartbeatReceiver Heartbeat
Figure 1. HeartbeatReceiver’s Heartbeat Message Handler

Executors provide in-memory storage for RDDs that are cached in Spark applications (via Block Manager).

When started, an executor first registers itself with the driver that establishes a communication channel directly to the driver to accept tasks for execution.

executor taskrunner executorbackend
Figure 2. Launching tasks on executor using TaskRunners

Executor offers are described by executor id and the host on which an executor runs (see Resource Offers in this document).

Executors can run multiple tasks over its lifetime, both in parallel and sequentially. They track running tasks (by their task ids in runningTasks internal registry). Consult Launching Tasks section.

Executors send metrics (and heartbeats) using the internal heartbeater - Heartbeat Sender Thread.

It is recommended to have as many executors as data nodes and as many cores as you can get from the cluster.

Executors are described by their id, hostname, environment (as SparkEnv), and classpath (and, less importantly, and more for internal optimization, whether they run in local or cluster mode).

Enable ALL logging level for org.apache.spark.executor.Executor logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.executor.Executor=ALL

Refer to Logging.

Creating Executor Instance

Executor takes the following to be created:

  • Executor ID

  • Host name

  • SparkEnv

  • User-defined jars (Seq[URL] / default: Nil)

  • Flag to control whether the executor runs in local or cluster mode (default: false)

  • Java’s UncaughtExceptionHandler (default: SparkUncaughtExceptionHandler)

User-defined JARs are defined using --user-class-path command-line option of CoarseGrainedExecutorBackend that can be set using spark.executor.extraClassPath property.
isLocal is enabled exclusively for LocalEndpoint (for Spark in local mode).

When created, you should see the following INFO messages in the logs:

INFO Executor: Starting executor ID [executorId] on host [executorHostname]

(only for non-local modes) Executor sets SparkUncaughtExceptionHandler as the default handler invoked when a thread abruptly terminates due to an uncaught exception.

(only for non-local modes) Executor requests the BlockManager to initialize (with the Spark application id of the SparkConf).

Spark application id corresponds to the value of spark.app.id Spark property.

(only for non-local modes) Executor requests the MetricsSystem to register the ExecutorSource and shuffleMetricsSource of the BlockManager.

Executor uses SparkEnv to access the local MetricsSystem and BlockManager.

Executor creates a task class loader (optionally with REPL support) that the current Serializer is requested to use (when deserializing task later).

Executor uses SparkEnv to access the local Serializer.

Executor initializes the internal properties.

updateDependencies Internal Method

updateDependencies(
  newFiles: Map[String, Long],
  newJars: Map[String, Long]): Unit

updateDependencies…​FIXME

updateDependencies is used exclusively when TaskRunner is requested to start (and run a task).

Launching Task — launchTask Method

launchTask(
  context: ExecutorBackend,
  taskDescription: TaskDescription): Unit

launchTask simply creates a TaskRunner (with the given ExecutorBackend and the TaskDescription) and adds it to the runningTasks internal registry.

In the end, launchTask requests the "Executor task launch worker" thread pool to execute the TaskRunner (sometime in the future).

executor taskrunner executorbackend
Figure 3. Launching tasks on executor using TaskRunners

launchTask is used when:

heartbeater — Heartbeat Sender Thread

heartbeater is a daemon ScheduledThreadPoolExecutor with a single thread.

The name of the thread pool is driver-heartbeater.

Coarse-Grained Executors

Coarse-grained executors are executors that use CoarseGrainedExecutorBackend for task scheduling.

Resource Offers

Read resourceOffers in TaskSchedulerImpl and resourceOffer in TaskSetManager.

"Executor task launch worker" Thread Pool — threadPool Property

Executor uses threadPool daemon cached thread pool with the name Executor task launch worker-[ID] (with ID being the task id) for launching tasks.

threadPool is created when Executor is created and shut down when it stops.

Executor Memory — spark.executor.memory or SPARK_EXECUTOR_MEMORY settings

You can control the amount of memory per executor using spark.executor.memory setting. It sets the available memory equally for all executors per application.

The amount of memory per executor is looked up when SparkContext is created.

You can change the assigned memory per executor per node in standalone cluster using SPARK_EXECUTOR_MEMORY environment variable.

You can find the value displayed as Memory per Node in web UI for standalone Master (as depicted in the figure below).

spark standalone webui memory per node
Figure 4. Memory per Node in Spark Standalone’s web UI

The above figure shows the result of running Spark shell with the amount of memory per executor defined explicitly (on command line), i.e.

./bin/spark-shell --master spark://localhost:7077 -c spark.executor.memory=2g

Metrics

Every executor registers its own ExecutorSource to report metrics.

Stopping Executor — stop Method

stop(): Unit

stop shuts driver-heartbeater thread down (and waits at most 10 seconds).

stop is used when CoarseGrainedExecutorBackend and LocalEndpoint are requested to stop their managed executors.

computeTotalGcTime Internal Method

computeTotalGcTime(): Long

computeTotalGcTime…​FIXME

computeTotalGcTime is used when:

createClassLoader Internal Method

createClassLoader(): MutableURLClassLoader

createClassLoader…​FIXME

createClassLoader is used when…​FIXME

addReplClassLoaderIfNeeded Internal Method

addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader

addReplClassLoaderIfNeeded…​FIXME

addReplClassLoaderIfNeeded is used when…​FIXME

Heartbeating With Partial Metrics For Active Tasks To Driver — reportHeartBeat Internal Method

reportHeartBeat(): Unit

reportHeartBeat collects TaskRunners for currently running tasks (aka active tasks) with their tasks deserialized (i.e. either ready for execution or already started).

TaskRunner has task deserialized when it runs the task.

For every running task, reportHeartBeat takes its TaskMetrics and:

reportHeartBeat then records the latest values of internal and external accumulators for every task.

Internal accumulators are a task’s metrics while external accumulators are a Spark application’s accumulators that a user has created.

reportHeartBeat sends a blocking Heartbeat message to HeartbeatReceiver endpoint (running on the driver). reportHeartBeat uses spark.executor.heartbeatInterval for the RPC timeout.

A Heartbeat message contains the executor identifier, the accumulator updates, and the identifier of the BlockManager.
reportHeartBeat uses SparkEnv to access the current BlockManager.

If the response (from HeartbeatReceiver endpoint) is to re-register the BlockManager, you should see the following INFO message in the logs and reportHeartBeat requests BlockManager to re-register (which will register the blocks the BlockManager manages with the driver).

INFO Told to re-register on heartbeat
HeartbeatResponse requests BlockManager to re-register when either TaskScheduler or HeartbeatReceiver know nothing about the executor.

When posting the Heartbeat was successful, reportHeartBeat resets heartbeatFailures internal counter.

In case of a non-fatal exception, you should see the following WARN message in the logs (followed by the stack trace).

WARN Issue communicating with driver in heartbeater

Every failure reportHeartBeat increments heartbeat failures up to spark.executor.heartbeat.maxFailures Spark property. When the heartbeat failures reaches the maximum, you should see the following ERROR message in the logs and the executor terminates with the error code: 56.

ERROR Exit as unable to send heartbeats to driver more than [HEARTBEAT_MAX_FAILURES] times
reportHeartBeat is used when Executor schedules reporting heartbeat and partial metrics for active tasks to the driver (that happens every spark.executor.heartbeatInterval Spark property).

Sending Heartbeats and Active Tasks Metrics — startDriverHeartbeater Internal Method

Executors keep sending metrics for active tasks to the driver every spark.executor.heartbeatInterval (defaults to 10s with some random initial delay so the heartbeats from different executors do not pile up on the driver).

executor heartbeatReceiver endpoint
Figure 5. Executors use HeartbeatReceiver endpoint to report task metrics

An executor sends heartbeats using the internal heartbeater — Heartbeat Sender Thread.

spark HeartbeatReceiver Heartbeat
Figure 6. HeartbeatReceiver’s Heartbeat Message Handler

For each task in TaskRunner (in runningTasks internal registry), the task’s metrics are computed (i.e. mergeShuffleReadMetrics and setJvmGCTime) that become part of the heartbeat (with accumulators).

FIXME How do mergeShuffleReadMetrics and setJvmGCTime influence accumulators?
Executors track the TaskRunner that run tasks. A task might not be assigned to a TaskRunner yet when the executor sends a heartbeat.

A blocking Heartbeat message that holds the executor id, all accumulator updates (per task id), and BlockManagerId is sent to HeartbeatReceiver RPC endpoint (with spark.executor.heartbeatInterval timeout).

FIXME When is heartbeatReceiverRef created?

If the response requests to reregister BlockManager, you should see the following INFO message in the logs:

INFO Executor: Told to re-register on heartbeat

The internal heartbeatFailures counter is reset (i.e. becomes 0).

If there are any issues with communicating with the driver, you should see the following WARN message in the logs:

WARN Executor: Issue communicating with driver in heartbeater

The internal heartbeatFailures is incremented and checked to be less than the acceptable number of failures (i.e. spark.executor.heartbeat.maxFailures Spark property). If the number is greater, the following ERROR is printed out to the logs:

ERROR Executor: Exit as unable to send heartbeats to driver more than [HEARTBEAT_MAX_FAILURES] times

The executor exits (using System.exit and exit code 56).

Read about TaskMetrics in TaskMetrics.

Internal Properties

Name Description

executorSource

heartbeatFailures

heartbeatReceiverRef

maxDirectResultSize

maxResultSize

Used exclusively when TaskRunner is requested to run (and creates a serialized ByteBuffer result that is a IndirectTaskResult)

runningTasks

TaskRunners per task ID (ConcurrentHashMap[Long, TaskRunner])