Skip to content

Executor

Executor is a process that is used for executing scheduler:Task.md[tasks].

Executor typically runs for the entire lifetime of a Spark application which is called static allocation of executors (but you could also opt in for ROOT:spark-dynamic-allocation.md[dynamic allocation]).

Executors are managed by executor:ExecutorBackend.md[executor backends].

Executors <> to the <> on the driver.

.HeartbeatReceiver's Heartbeat Message Handler image::HeartbeatReceiver-Heartbeat.png[align="center"]

Executors provide in-memory storage for RDDs that are cached in Spark applications (via storage:BlockManager.md[]).

When started, an executor first registers itself with the driver that establishes a communication channel directly to the driver to accept tasks for execution.

.Launching tasks on executor using TaskRunners image::executor-taskrunner-executorbackend.png[align="center"]

Executor offers are described by executor id and the host on which an executor runs (see <> in this document).

Executors can run multiple tasks over its lifetime, both in parallel and sequentially. They track executor:TaskRunner.md[running tasks] (by their task ids in <> internal registry). Consult <> section.

Executors use a <> for <>.

Executors send <> (and heartbeats) using the <>.

It is recommended to have as many executors as data nodes and as many cores as you can get from the cluster.

Executors are described by their id, hostname, environment (as SparkEnv), and classpath (and, less importantly, and more for internal optimization, whether they run in spark-local:index.md[local] or ROOT:spark-cluster.md[cluster] mode).

== [[creating-instance]] Creating Instance

Executor takes the following to be created:

  • [[executorId]] Executor ID
  • [[executorHostname]] Host name
  • [[env]] core:SparkEnv.md[]
  • <>
  • <>
  • [[uncaughtExceptionHandler]] Java's UncaughtExceptionHandler (default: SparkUncaughtExceptionHandler)

When created, Executor prints out the following INFO messages to the logs:

Starting executor ID [executorId] on host [executorHostname]

(only for <>) Executor sets SparkUncaughtExceptionHandler as the default handler invoked when a thread abruptly terminates due to an uncaught exception.

(only for <>) Executor requests the core:SparkEnv.md#blockManager[BlockManager] to storage:BlockManager.md#initialize[initialize] (with the ROOT:SparkConf.md#getAppId[Spark application id] of the core:SparkEnv.md#conf[SparkConf]).

[[creating-instance-BlockManager-shuffleMetricsSource]] (only for <>) Executor requests the core:SparkEnv.md#metricsSystem[MetricsSystem] to metrics:spark-metrics-MetricsSystem.md#registerSource[register] the <> and storage:BlockManager.md#shuffleMetricsSource[shuffleMetricsSource] of the core:SparkEnv.md#blockManager[BlockManager].

Executor uses SparkEnv to access the core:SparkEnv.md#metricsSystem[MetricsSystem] and core:SparkEnv.md#blockManager[BlockManager].

Executor <> (optionally with <>) and requests the system Serializer to serializer:Serializer.md#setDefaultClassLoader[use as the default classloader] (for deserializing tasks).

Executor <>.

Executor is created when:

  • CoarseGrainedExecutorBackend executor:CoarseGrainedExecutorBackend.md#RegisteredExecutor[receives RegisteredExecutor message]

  • (Spark on Mesos) MesosExecutorBackend is requested to spark-on-mesos:spark-executor-backends-MesosExecutorBackend.md#registered[registered]

  • spark-local:spark-LocalEndpoint.md[LocalEndpoint] is created

== [[isLocal]] isLocal Flag

Executor is given a isLocal flag when created. This is how the executor knows whether it runs in local or cluster mode. It is disabled by default.

The flag is turned on for spark-local:index.md[Spark local] (via spark-local:spark-LocalEndpoint.md[LocalEndpoint]).

== [[userClassPath]] User-Defined Jars

Executor is given user-defined jars when created. There are no jars defined by default.

The jars are specified using ROOT:configuration-properties.md#spark.executor.extraClassPath[spark.executor.extraClassPath] configuration property (via executor:CoarseGrainedExecutorBackend.md#main[--user-class-path] command-line option of CoarseGrainedExecutorBackend).

== [[runningTasks]] Running Tasks

Executor tracks running tasks in a registry of executor:TaskRunner.md[TaskRunners] per task ID.

== [[heartbeatReceiverRef]] HeartbeatReceiver RPC Endpoint Reference

rpc:RpcEndpointRef.md[RPC endpoint reference] to ROOT:spark-HeartbeatReceiver.md[HeartbeatReceiver] on the ROOT:spark-driver.md[driver].

Set when Executor <>.

Used exclusively when Executor <> (that happens every <> interval).

== [[updateDependencies]] updateDependencies Method

[source, scala]

updateDependencies( newFiles: Map[String, Long], newJars: Map[String, Long]): Unit


updateDependencies...FIXME

updateDependencies is used when TaskRunner is requested to executor:TaskRunner.md#run[start] (and run a task).

== [[launchTask]] Launching Task

[source, scala]

launchTask( context: ExecutorBackend, taskDescription: TaskDescription): Unit


launchTask simply creates a executor:TaskRunner.md[] (with the given executor:ExecutorBackend.md[] and the scheduler:spark-scheduler-TaskDescription.md[TaskDescription]) and adds it to the <> internal registry.

In the end, launchTask requests the <> to execute the TaskRunner (sometime in the future).

.Launching tasks on executor using TaskRunners image::executor-taskrunner-executorbackend.png[align="center"]

launchTask is used when:

  • CoarseGrainedExecutorBackend is requested to executor:CoarseGrainedExecutorBackend.md#LaunchTask[handle a LaunchTask message]

  • LocalEndpoint RPC endpoint (of spark-local:spark-LocalSchedulerBackend.md#[LocalSchedulerBackend]) is requested to spark-local:spark-LocalEndpoint.md#reviveOffers[reviveOffers]

  • MesosExecutorBackend is requested to spark-on-mesos:spark-executor-backends-MesosExecutorBackend.md#launchTask[launchTask]

== [[heartbeater]] Heartbeat Sender Thread

heartbeater is a daemon {java-javadoc-url}/java/util/concurrent/ScheduledThreadPoolExecutor.html[ScheduledThreadPoolExecutor] with a single thread.

The name of the thread pool is driver-heartbeater.

== [[coarse-grained-executor]] Coarse-Grained Executors

Coarse-grained executors are executors that use executor:CoarseGrainedExecutorBackend.md[] for task scheduling.

== [[resource-offers]] Resource Offers

Read scheduler:TaskSchedulerImpl.md#resourceOffers[resourceOffers] in TaskSchedulerImpl and scheduler:TaskSetManager.md#resourceOffers[resourceOffer] in TaskSetManager.

== [[threadPool]] Executor task launch worker Thread Pool

Executor uses threadPool daemon cached thread pool with the name Executor task launch worker-[ID] (with ID being the task id) for <>.

threadPool is created when <> and shut down when <>.

== [[memory]] Executor Memory

You can control the amount of memory per executor using ROOT:configuration-properties.md#spark.executor.memory[spark.executor.memory] configuration property. It sets the available memory equally for all executors per application.

The amount of memory per executor is looked up when ROOT:SparkContext.md#creating-instance[SparkContext is created].

You can change the assigned memory per executor per node in spark-standalone:index.md[standalone cluster] using ROOT:SparkContext.md#environment-variables[SPARK_EXECUTOR_MEMORY] environment variable.

You can find the value displayed as Memory per Node in spark-standalone:spark-standalone-Master.md[web UI for standalone Master] (as depicted in the figure below).

.Memory per Node in Spark Standalone's web UI image::spark-standalone-webui-memory-per-node.png[align="center"]

The above figure shows the result of running tools:spark-shell.md[Spark shell] with the amount of memory per executor defined explicitly (on command line), i.e.

./bin/spark-shell --master spark://localhost:7077 -c spark.executor.memory=2g

== [[metrics]] Metrics

Every executor registers its own executor:ExecutorSource.md[] to metrics:spark-metrics-MetricsSystem.md#report[report metrics].

== [[stop]] Stopping Executor

[source, scala]

stop(): Unit

stop requests core:SparkEnv.md#metricsSystem[MetricsSystem] for a metrics:spark-metrics-MetricsSystem.md#report[report].

stop shuts <> down (and waits at most 10 seconds).

stop shuts <> down.

(only when <>) stop core:SparkEnv.md#stop[requests SparkEnv to stop].

stop is used when executor:CoarseGrainedExecutorBackend.md#Shutdown[CoarseGrainedExecutorBackend] and spark-local:spark-LocalEndpoint.md#StopExecutor[LocalEndpoint] are requested to stop their managed executors.

== [[computeTotalGcTime]] computeTotalGcTime Method

[source, scala]

computeTotalGcTime(): Long

computeTotalGcTime...FIXME

computeTotalGcTime is used when:

  • TaskRunner is requested to executor:TaskRunner.md#collectAccumulatorsAndResetStatusOnFailure[collectAccumulatorsAndResetStatusOnFailure] and executor:TaskRunner.md#run[run]

  • Executor is requested to <>

== [[createClassLoader]] createClassLoader Method

[source, scala]

createClassLoader(): MutableURLClassLoader

createClassLoader...FIXME

createClassLoader is used when...FIXME

== [[addReplClassLoaderIfNeeded]] addReplClassLoaderIfNeeded Method

[source, scala]

addReplClassLoaderIfNeeded( parent: ClassLoader): ClassLoader


addReplClassLoaderIfNeeded...FIXME

addReplClassLoaderIfNeeded is used when...FIXME

== [[reportHeartBeat]] Heartbeating With Partial Metrics For Active Tasks To Driver

[source, scala]

reportHeartBeat(): Unit

reportHeartBeat collects executor:TaskRunner.md[TaskRunners] for <> (aka active tasks) with their executor:TaskRunner.md#task[tasks] deserialized (i.e. either ready for execution or already started).

executor:TaskRunner.md[] has TaskRunner.md#task[task] deserialized when it executor:TaskRunner.md#run[runs the task].

For every running task, reportHeartBeat takes its scheduler:Task.md#metrics[TaskMetrics] and:

  • Requests executor:TaskMetrics.md#mergeShuffleReadMetrics[ShuffleRead metrics to be merged]
  • executor:TaskMetrics.md#setJvmGCTime[Sets jvmGCTime metrics]

reportHeartBeat then records the latest values of executor:TaskMetrics.md#accumulators[internal and external accumulators] for every task.

NOTE: Internal accumulators are a task's metrics while external accumulators are a Spark application's accumulators that a user has created.

reportHeartBeat sends a blocking ROOT:spark-HeartbeatReceiver.md#Heartbeat[Heartbeat] message to <HeartbeatReceiver endpoint>> (running on the driver). reportHeartBeat uses the value of ROOT:configuration-properties.md#spark.executor.heartbeatInterval[spark.executor.heartbeatInterval] configuration property for the RPC timeout.

NOTE: A Heartbeat message contains the executor identifier, the accumulator updates, and the identifier of the storage:BlockManager.md[].

If the response (from <HeartbeatReceiver endpoint>>) is to re-register the BlockManager, you should see the following INFO message in the logs and reportHeartBeat requests the BlockManager to storage:BlockManager.md#reregister[re-register] (which will register the blocks the BlockManager manages with the driver).

[source,plaintext]

Told to re-register on heartbeat

HeartbeatResponse requests the BlockManager to re-register when either scheduler:TaskScheduler.md#executorHeartbeatReceived[TaskScheduler] or ROOT:spark-HeartbeatReceiver.md#Heartbeat[HeartbeatReceiver] know nothing about the executor.

When posting the Heartbeat was successful, reportHeartBeat resets <> internal counter.

In case of a non-fatal exception, you should see the following WARN message in the logs (followed by the stack trace).

Issue communicating with driver in heartbeater

Every failure reportHeartBeat increments <> up to ROOT:configuration-properties.md#spark.executor.heartbeat.maxFailures[spark.executor.heartbeat.maxFailures] configuration property. When the heartbeat failures reaches the maximum, you should see the following ERROR message in the logs and the executor terminates with the error code: 56.

Exit as unable to send heartbeats to driver more than [HEARTBEAT_MAX_FAILURES] times

reportHeartBeat is used when Executor is requested to <> (that happens every ROOT:configuration-properties.md#spark.executor.heartbeatInterval[spark.executor.heartbeatInterval]).

== [[startDriverHeartbeater]][[heartbeats-and-active-task-metrics]] Sending Heartbeats and Active Tasks Metrics

Executors keep sending <> to the driver every <> (defaults to 10s with some random initial delay so the heartbeats from different executors do not pile up on the driver).

.Executors use HeartbeatReceiver endpoint to report task metrics image::executor-heartbeatReceiver-endpoint.png[align="center"]

An executor sends heartbeats using the <>.

.HeartbeatReceiver's Heartbeat Message Handler image::HeartbeatReceiver-Heartbeat.png[align="center"]

For each scheduler:Task.md[task] in executor:TaskRunner.md[] (in <> internal registry), the task's metrics are computed (i.e. mergeShuffleReadMetrics and setJvmGCTime) that become part of the heartbeat (with accumulators).

NOTE: Executors track the executor:TaskRunner.md[] that run scheduler:Task.md[tasks]. A executor:TaskRunner.md#run[task might not be assigned to a TaskRunner yet] when the executor sends a heartbeat.

A blocking ROOT:spark-HeartbeatReceiver.md#Heartbeat[Heartbeat] message that holds the executor id, all accumulator updates (per task id), and storage:BlockManagerId.md[] is sent to ROOT:spark-HeartbeatReceiver.md[HeartbeatReceiver RPC endpoint] (with <> timeout).

If the response ROOT:spark-HeartbeatReceiver.md#Heartbeat[requests to reregister BlockManager], you should see the following INFO message in the logs:

Told to re-register on heartbeat

BlockManager is requested to storage:BlockManager.md#reregister[reregister].

The internal <> counter is reset (i.e. becomes 0).

If there are any issues with communicating with the driver, you should see the following WARN message in the logs:

[source,plaintext]

Issue communicating with driver in heartbeater

The internal <> is incremented and checked to be less than the <> (i.e. spark.executor.heartbeat.maxFailures Spark property). If the number is greater, the following ERROR is printed out to the logs:

Exit as unable to send heartbeats to driver more than [HEARTBEAT_MAX_FAILURES] times

The executor exits (using System.exit and exit code 56).

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.executor.Executor logger to see what happens inside.

Add the following line to conf/log4j.properties:

[source,plaintext]

log4j.logger.org.apache.spark.executor.Executor=ALL

Refer to ROOT:spark-logging.md[Logging].

== [[internal-properties]] Internal Properties

=== [[executorSource]] ExecutorSource

executor:ExecutorSource.md[]

=== [[heartbeatFailures]] heartbeatFailures

=== [[maxDirectResultSize]] maxDirectResultSize

=== [[maxResultSize]] maxResultSize

Used exclusively when TaskRunner is requested to executor:TaskRunner.md#run[run] (and creates a serialized ByteBuffer result that is a IndirectTaskResult)


Last update: 2020-10-06