Skip to content

CoarseGrainedSchedulerBackend

CoarseGrainedSchedulerBackend is a base SchedulerBackend for coarse-grained schedulers.

CoarseGrainedSchedulerBackend is an ExecutorAllocationClient.

CoarseGrainedSchedulerBackend is responsible for requesting resources from a cluster manager for executors that it in turn uses to launch tasks (on CoarseGrainedExecutorBackend).

CoarseGrainedSchedulerBackend holds executors for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task.

CoarseGrainedSchedulerBackend registers CoarseGrainedScheduler RPC Endpoint that executors use for RPC communication.

Note

Active executors are executors that are not pending to be removed or lost.

Implementations

  • KubernetesClusterSchedulerBackend (Spark on Kubernetes)
  • MesosCoarseGrainedSchedulerBackend (Spark on Mesos)
  • StandaloneSchedulerBackend (Spark Standalone)
  • YarnSchedulerBackend (Spark on YARN)

Creating Instance

CoarseGrainedSchedulerBackend takes the following to be created:

CoarseGrainedScheduler RPC Endpoint

driverEndpoint: RpcEndpointRef

CoarseGrainedSchedulerBackend registers a DriverEndpoint RPC endpoint known as CoarseGrainedScheduler when created.

Creating DriverEndpoint

createDriverEndpoint(): DriverEndpoint

createDriverEndpoint creates a new DriverEndpoint.

Note

The purpose of createDriverEndpoint is to let CoarseGrainedSchedulerBackends to provide their own custom implementations:

createDriverEndpoint is used when:

Maximum Number of Concurrent Tasks

SchedulerBackend
maxNumConcurrentTasks(
  rp: ResourceProfile): Int

maxNumConcurrentTasks is part of the SchedulerBackend abstraction.

maxNumConcurrentTasks uses the Available Executors registry to find out about available ResourceProfiles, total number of CPU cores and ExecutorResourceInfos of every active executor.

In the end, maxNumConcurrentTasks calculates the available (parallel) slots for the given ResourceProfile (and given the available executor resources).

totalRegisteredExecutors Registry

totalRegisteredExecutors: AtomicInteger

totalRegisteredExecutors is an internal registry of the number of registered executors (a Java AtomicInteger).

totalRegisteredExecutors starts from 0.

totalRegisteredExecutors is incremented when:

totalRegisteredExecutors is decremented when:

Sufficient Resources Registered

sufficientResourcesRegistered(): Boolean

sufficientResourcesRegistered is true (and is supposed to be overriden by custom CoarseGrainedSchedulerBackends).

Minimum Resources Available Ratio

minRegisteredRatio: Double

minRegisteredRatio is a ratio of the minimum resources available to the total expected resources for the CoarseGrainedSchedulerBackend to be ready for scheduling tasks (for execution).

minRegisteredRatio uses spark.scheduler.minRegisteredResourcesRatio configuration property if defined or defaults to 0.0.

minRegisteredRatio can be between 0.0 and 1.0 (inclusive).

minRegisteredRatio is used when:

  • CoarseGrainedSchedulerBackend is requested to isReady
  • StandaloneSchedulerBackend is requested to sufficientResourcesRegistered
  • KubernetesClusterSchedulerBackend is requested to sufficientResourcesRegistered
  • MesosCoarseGrainedSchedulerBackend is requested to sufficientResourcesRegistered
  • YarnSchedulerBackend is requested to sufficientResourcesRegistered

Available Executors Registry

executorDataMap: HashMap[String, ExecutorData]

CoarseGrainedSchedulerBackend tracks available executors using executorDataMap registry (of ExecutorDatas by executor id).

A new entry is added when DriverEndpoint is requested to handle RegisterExecutor message.

An entry is removed when DriverEndpoint is requested to handle RemoveExecutor message or a remote host (with one or many executors) disconnects.

Revive Messages Scheduler Service

reviveThread: ScheduledExecutorService

CoarseGrainedSchedulerBackend creates a Java ScheduledExecutorService when created.

The ScheduledExecutorService is used by DriverEndpoint RPC Endpoint to post ReviveOffers messages regularly.

Maximum Size of RPC Message

maxRpcMessageSize is the value of spark.rpc.message.maxSize configuration property.

Making Fake Resource Offers on Executors

makeOffers(): Unit
makeOffers(
  executorId: String): Unit

makeOffers takes the active executors (out of the <> internal registry) and creates WorkerOffer resource offers for each (one per executor with the executor's id, host and free cores).

CAUTION: Only free cores are considered in making offers. Memory is not! Why?!

It then requests TaskSchedulerImpl.md#resourceOffers[TaskSchedulerImpl to process the resource offers] to create a collection of TaskDescription collections that it in turn uses to launch tasks.

Getting Executor Ids

When called, getExecutorIds simply returns executor ids from the internal <> registry.

NOTE: It is called when SparkContext.md#getExecutorIds[SparkContext calculates executor ids].

Requesting Executors

requestExecutors(
  numAdditionalExecutors: Int): Boolean

requestExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).

requestExecutors method is part of the ExecutorAllocationClient abstraction.

When called, you should see the following INFO message followed by DEBUG message in the logs:

Requesting [numAdditionalExecutors] additional executor(s) from the cluster manager
Number of pending executors is now [numPendingExecutors]

<> is increased by the input numAdditionalExecutors.

requestExecutors requests executors from a cluster manager (that reflects the current computation needs). The "new executor total" is a sum of the internal <> and <> decreased by the <>.

If numAdditionalExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of additional executor(s) [numAdditionalExecutors] from the cluster manager. Please specify a positive number!

NOTE: It is a final method that no other scheduler backends could customize further.

NOTE: The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

Requesting Exact Number of Executors

requestTotalExecutors(
  numExecutors: Int,
  localityAwareTasks: Int,
  hostToLocalTaskCount: Map[String, Int]): Boolean

requestTotalExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).

requestTotalExecutors is part of the ExecutorAllocationClient abstraction.

It sets the internal <> and <> registries. It then calculates the exact number of executors which is the input numExecutors and the <> decreased by the number of <>.

If numExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of executor(s) [numExecutors] from the cluster manager. Please specify a positive number!

NOTE: It is a final method that no other scheduler backends could customize further.

NOTE: The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

Finding Default Level of Parallelism

defaultParallelism(): Int

defaultParallelism is part of the SchedulerBackend abstraction.

defaultParallelism is spark.default.parallelism configuration property if defined.

Otherwise, defaultParallelism is the maximum of totalCoreCount or 2.

Killing Task

killTask(
  taskId: Long,
  executorId: String,
  interruptThread: Boolean): Unit

killTask is part of the SchedulerBackend abstraction.

killTask simply sends a KillTask message to <>.

Stopping All Executors

stopExecutors sends a blocking <> message to <> (if already initialized).

NOTE: It is called exclusively while CoarseGrainedSchedulerBackend is <>.

You should see the following INFO message in the logs:

Shutting down all executors

Reset State

reset resets the internal state:

  1. Sets <> to 0
  2. Clears executorsPendingToRemove
  3. Sends a blocking <> message to <> for every executor (in the internal executorDataMap) to inform it about SlaveLost with the message: +
    Stale executor after cluster manager re-registered.
    

reset is a method that is defined in CoarseGrainedSchedulerBackend, but used and overriden exclusively by yarn/spark-yarn-yarnschedulerbackend.md[YarnSchedulerBackend].

Remove Executor

removeExecutor(executorId: String, reason: ExecutorLossReason)

removeExecutor sends a blocking <> message to <>.

NOTE: It is called by subclasses spark-standalone.md#SparkDeploySchedulerBackend[SparkDeploySchedulerBackend], spark-mesos/spark-mesos.md#CoarseMesosSchedulerBackend[CoarseMesosSchedulerBackend], and yarn/spark-yarn-yarnschedulerbackend.md[YarnSchedulerBackend].

CoarseGrainedScheduler RPC Endpoint

When <>, it registers CoarseGrainedScheduler RPC endpoint to be the driver's communication endpoint.

driverEndpoint is a DriverEndpoint.

Note

CoarseGrainedSchedulerBackend is created while SparkContext is being created that in turn lives inside a Spark driver. That explains the name driverEndpoint (at least partially).

It is called standalone scheduler's driver endpoint internally.

It tracks:

It uses driver-revive-thread daemon single-thread thread pool for ...FIXME

CAUTION: FIXME A potential issue with driverEndpoint.asInstanceOf[NettyRpcEndpointRef].toURI - doubles spark:// prefix.

Starting CoarseGrainedSchedulerBackend

start(): Unit

start is part of the SchedulerBackend abstraction.

start takes all spark.-prefixed properties and registers the <CoarseGrainedScheduler RPC endpoint>> (backed by DriverEndpoint ThreadSafeRpcEndpoint).

CoarseGrainedScheduler Endpoint

NOTE: start uses <> to access the current SparkContext.md[SparkContext] and in turn SparkConf.md[SparkConf].

NOTE: start uses <> that was given when <CoarseGrainedSchedulerBackend was created>>.

Checking If Sufficient Compute Resources Available Or Waiting Time PassedMethod

isReady(): Boolean

isReady is part of the SchedulerBackend abstraction.

isReady allows to delay task launching until <> or <> passes.

Internally, isReady <>.

NOTE: <> by default responds that sufficient resources are available.

If the <>, you should see the following INFO message in the logs and isReady is positive.

SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: [minRegisteredRatio]

If there are no sufficient resources available yet (the above requirement does not hold), isReady checks whether the time since <> passed <> to give a way to launch tasks (even when <> not being reached yet).

You should see the following INFO message in the logs and isReady is positive.

SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: [maxRegisteredWaitingTimeMs](ms)

Otherwise, when <> and <> has not elapsed, isReady is negative.

Reviving Resource Offers

reviveOffers(): Unit

reviveOffers is part of the SchedulerBackend abstraction.

reviveOffers simply sends a ReviveOffers message to CoarseGrainedSchedulerBackend RPC endpoint.

CoarseGrainedExecutorBackend Revives Offers

Stopping SchedulerBackend

stop(): Unit

stop is part of the SchedulerBackend abstraction.

stop <> and <CoarseGrainedScheduler RPC endpoint>> (by sending a blocking StopDriver message).

In case of any Exception, stop reports a SparkException with the message:

Error stopping standalone scheduler's driver endpoint

createDriverEndpointRef

createDriverEndpointRef(
  properties: ArrayBuffer[(String, String)]): RpcEndpointRef

createDriverEndpointRef <> and rpc:index.md#setupEndpoint[registers it] as CoarseGrainedScheduler.

createDriverEndpointRef is used when CoarseGrainedSchedulerBackend is requested to <>.

Checking Whether Executor is Active

isExecutorActive(
  id: String): Boolean

isExecutorActive is part of the ExecutorAllocationClient abstraction.

isExecutorActive...FIXME

Requesting Executors from Cluster Manager

doRequestTotalExecutors(
  requestedTotal: Int): Future[Boolean]

doRequestTotalExecutors returns a completed Future with false value.

doRequestTotalExecutors is used when:

Logging

Enable ALL logging level for org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend=ALL

Refer to Logging.