CoarseGrainedSchedulerBackend is a SchedulerBackend.

CoarseGrainedSchedulerBackend is an ExecutorAllocationClient.

CoarseGrainedSchedulerBackend is responsible for requesting resources from a cluster manager for executors that it in turn uses to launch tasks (on coarse-grained executors).

CoarseGrainedSchedulerBackend holds executors for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task.

FIXME Picture with dependencies

CoarseGrainedSchedulerBackend registers CoarseGrainedScheduler RPC Endpoint that executors use for RPC communication.

Active executors are executors that are not pending to be removed or lost.
Table 1. Built-In CoarseGrainedSchedulerBackends per Cluster Environment
Cluster Environment CoarseGrainedSchedulerBackend

Spark Standalone


Spark on YARN


Spark on Mesos


CoarseGrainedSchedulerBackend is only created indirectly through built-in implementations per cluster environment.
Table 2. CoarseGrainedSchedulerBackend’s Internal Properties
Name Initial Value Description


The last (highest) identifier of all allocated executors.

Used exclusively in YarnSchedulerEndpoint to respond to RetrieveLastAllocatedExecutorId message.


Current time

The time CoarseGrainedSchedulerBackend was created.


spark.rpc.askTimeout or or 120s

Default timeout for blocking RPC messages (aka ask messages).



RPC endpoint reference to CoarseGrainedScheduler RPC endpoint (with DriverEndpoint as the message handler).

Initialized when CoarseGrainedSchedulerBackend starts.

Used when CoarseGrainedSchedulerBackend executes the following (asynchronously, i.e. on a separate thread):



Registry of ExecutorData by executor id.

NOTE: ExecutorData holds an executor’s endpoint reference, address, host, the number of free and total CPU cores, the URL of execution logs.

Element added when DriverEndpoint receives RegisterExecutor message and removed when DriverEndpoint receives RemoveExecutor message or a remote host (with one or many executors) disconnects.



Executors marked as removed but the confirmation from a cluster manager has not arrived yet.



Registry of hostnames and possible number of task running on them.



Number of pending tasks…​FIXME




spark.rpc.message.maxSize but not greater than 2047

Maximum RPC message size in MB.

When above 2047 MB you should see the following IllegalArgumentException:

spark.rpc.message.maxSize should not be greater than 2047 MB







Total number of CPU cores, i.e. the sum of all the cores on all executors.



Total number of registered executors

Enable INFO or DEBUG logging level for org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend logger to see what happens inside.

Add the following line to conf/

Refer to Logging.

Killing All Executors on Node — killExecutorsOnHost Method


Making Fake Resource Offers on Executors — makeOffers Internal Methods

makeOffers(): Unit
makeOffers(executorId: String): Unit

makeOffers takes the active executors (out of the executorDataMap internal registry) and creates WorkerOffer resource offers for each (one per executor with the executor’s id, host and free cores).

Only free cores are considered in making offers. Memory is not! Why?!

It then requests TaskSchedulerImpl to process the resource offers to create a collection of TaskDescription collections that it in turn uses to launch tasks.

Creating CoarseGrainedSchedulerBackend Instance

CoarseGrainedSchedulerBackend takes the following when created:

CoarseGrainedSchedulerBackend initializes the internal registries and counters.

Getting Executor Ids — getExecutorIds Method

When called, getExecutorIds simply returns executor ids from the internal executorDataMap registry.

CoarseGrainedSchedulerBackend Contract

class CoarseGrainedSchedulerBackend {
  def minRegisteredRatio: Double
  def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint
  def reset(): Unit
  def sufficientResourcesRegistered(): Boolean
  def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean]
  def doKillExecutors(executorIds: Seq[String]): Future[Boolean]
CoarseGrainedSchedulerBackend is a private[spark] contract.
Table 3. FIXME Contract
Method Description


Ratio between 0 and 1 (inclusive).

Controlled by spark.scheduler.minRegisteredResourcesRatio.








Always positive, i.e. true, that means that sufficient resources are available.

Used when CoarseGrainedSchedulerBackend checks if sufficient compute resources are available.

numExistingExecutors Method


killExecutors Methods


getDriverLogUrls Method


applicationAttemptId Method


Requesting Additional Executors — requestExecutors Method

requestExecutors(numAdditionalExecutors: Int): Boolean

requestExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).

requestExecutors method is part of ExecutorAllocationClient Contract that SparkContext uses for requesting additional executors (as a part of a developer API for dynamic allocation of executors).

When called, you should see the following INFO message followed by DEBUG message in the logs:

INFO Requesting [numAdditionalExecutors] additional executor(s) from the cluster manager
DEBUG Number of pending executors is now [numPendingExecutors]

numPendingExecutors is increased by the input numAdditionalExecutors.

requestExecutors requests executors from a cluster manager (that reflects the current computation needs). The "new executor total" is a sum of the internal numExistingExecutors and numPendingExecutors decreased by the number of executors pending to be removed.

If numAdditionalExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of additional executor(s) [numAdditionalExecutors] from the cluster manager. Please specify a positive number!
It is a final method that no other scheduler backends could customize further.
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

Requesting Exact Number of Executors — requestTotalExecutors Method

  numExecutors: Int,
  localityAwareTasks: Int,
  hostToLocalTaskCount: Map[String, Int]): Boolean

requestTotalExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).

It sets the internal localityAwareTasks and hostToLocalTaskCount registries. It then calculates the exact number of executors which is the input numExecutors and the executors pending removal decreased by the number of already-assigned executors.

If numExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of executor(s) [numExecutors] from the cluster manager. Please specify a positive number!
It is a final method that no other scheduler backends could customize further.
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

Finding Default Level of Parallelism — defaultParallelism Method

defaultParallelism(): Int
defaultParallelism is part of the SchedulerBackend Contract.

defaultParallelism is spark.default.parallelism configuration if defined.

Otherwise, defaultParallelism is the maximum of totalCoreCount or 2.

Killing Task — killTask Method

killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit
killTask is part of the SchedulerBackend contract.

killTask simply sends a KillTask message to driverEndpoint.


Stopping All Executors — stopExecutors Method

stopExecutors sends a blocking StopExecutors message to driverEndpoint (if already initialized).

It is called exclusively while CoarseGrainedSchedulerBackend is being stopped.

You should see the following INFO message in the logs:

INFO CoarseGrainedSchedulerBackend: Shutting down all executors

Reset State — reset Method

reset resets the internal state:

  1. Sets numPendingExecutors to 0

  2. Clears executorsPendingToRemove

  3. Sends a blocking RemoveExecutor message to driverEndpoint for every executor (in the internal executorDataMap) to inform it about SlaveLost with the message:

    Stale executor after cluster manager re-registered.

reset is a method that is defined in CoarseGrainedSchedulerBackend, but used and overriden exclusively by YarnSchedulerBackend.

Remove Executor — removeExecutor Method

removeExecutor(executorId: String, reason: ExecutorLossReason)

removeExecutor sends a blocking RemoveExecutor message to driverEndpoint.

CoarseGrainedScheduler RPC Endpoint — driverEndpoint

When CoarseGrainedSchedulerBackend starts, it registers CoarseGrainedScheduler RPC endpoint to be the driver’s communication endpoint.

driverEndpoint is a DriverEndpoint.

CoarseGrainedSchedulerBackend is created while SparkContext is being created that in turn lives inside a Spark driver. That explains the name driverEndpoint (at least partially).

It is called standalone scheduler’s driver endpoint internally.

It tracks:

It uses driver-revive-thread daemon single-thread thread pool for …​FIXME

FIXME A potential issue with driverEndpoint.asInstanceOf[NettyRpcEndpointRef].toURI - doubles spark:// prefix.

Starting CoarseGrainedSchedulerBackend (and Registering CoarseGrainedScheduler RPC Endpoint) — start Method

start(): Unit
start is part of the SchedulerBackend contract.

start takes all spark.-prefixed properties and registers the CoarseGrainedScheduler RPC endpoint (backed by DriverEndpoint ThreadSafeRpcEndpoint).

CoarseGrainedScheduler rpc endpoint
Figure 1. CoarseGrainedScheduler Endpoint
start uses TaskSchedulerImpl to access the current SparkContext and in turn SparkConf.
start uses RpcEnv that was given when CoarseGrainedSchedulerBackend was created.

Checking If Sufficient Compute Resources Available Or Waiting Time Passed — isReady Method

isReady(): Boolean
isReady is part of the SchedulerBackend contract.

isReady allows to delay task launching until sufficient resources are available or spark.scheduler.maxRegisteredResourcesWaitingTime passes.

sufficientResourcesRegistered by default responds that sufficient resources are available.

If the resources are available, you should see the following INFO message in the logs and isReady is positive.

INFO SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: [minRegisteredRatio]
minRegisteredRatio is in the range 0 to 1 (uses spark.scheduler.minRegisteredResourcesRatio) to denote the minimum ratio of registered resources to total expected resources before submitting tasks.

If there are no sufficient resources available yet (the above requirement does not hold), isReady checks whether the time since startup passed spark.scheduler.maxRegisteredResourcesWaitingTime to give a way to launch tasks (even when minRegisteredRatio not being reached yet).

You should see the following INFO message in the logs and isReady is positive.

INFO SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: [maxRegisteredWaitingTimeMs](ms)

Otherwise, when no sufficient resources are available and spark.scheduler.maxRegisteredResourcesWaitingTime has not elapsed, isReady is negative.

Reviving Resource Offers (by Posting ReviveOffers to CoarseGrainedSchedulerBackend RPC Endpoint) — reviveOffers Method

reviveOffers(): Unit
reviveOffers is part of the SchedulerBackend contract.

reviveOffers simply sends a ReviveOffers message to CoarseGrainedSchedulerBackend RPC endpoint.

CoarseGrainedExecutorBackend reviveOffers
Figure 2. CoarseGrainedExecutorBackend Revives Offers

Stopping CoarseGrainedSchedulerBackend (and Stopping Executors) — stop Method

stop(): Unit
stop is part of the SchedulerBackend contract.

stop stops all executors and CoarseGrainedScheduler RPC endpoint (by sending a blocking StopDriver message).

In case of any Exception, stop reports a SparkException with the message:

Error stopping standalone scheduler's driver endpoint

createDriverEndpointRef Method

createDriverEndpointRef(properties: ArrayBuffer[(String, String)]): RpcEndpointRef

createDriverEndpointRef creates DriverEndpoint and registers it as CoarseGrainedScheduler.

createDriverEndpointRef is used when CoarseGrainedSchedulerBackend starts.

Creating DriverEndpoint — createDriverEndpoint Method

createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint

createDriverEndpoint simply creates a DriverEndpoint.

The purpose of createDriverEndpoint is to allow YARN to use the custom YarnDriverEndpoint.
createDriverEndpoint is used when CoarseGrainedSchedulerBackend createDriverEndpointRef.


Table 4. Spark Properties
Property Default Value Description



Time (in milliseconds) between resource offers revives.



Maximum message size to allow in RPC communication. In MB when the unit is not given.

Generally only applies to map output size (serialized) information sent between executors and the driver.

Increase this if you are running jobs with many thousands of map and reduce tasks and see messages about the RPC message size.



Double number between 0 and 1 (including) that controls the minimum ratio of (registered resources / total expected resources) before submitting tasks.

See isReady in this document.



Time to wait for sufficient resources available.

See isReady in this document.