Skip to content

CoarseGrainedSchedulerBackend

CoarseGrainedSchedulerBackend is a SchedulerBackend.

CoarseGrainedSchedulerBackend is an ExecutorAllocationClient.

CoarseGrainedSchedulerBackend is responsible for requesting resources from a cluster manager for executors that it in turn uses to CoarseGrainedSchedulerBackend-DriverEndpoint.md#launchTasks[launch tasks] (on executor:CoarseGrainedExecutorBackend.md[]).

CoarseGrainedSchedulerBackend holds executors for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task.

CoarseGrainedSchedulerBackend registers <> that executors use for RPC communication.

NOTE: Active executors are executors that are not <> or CoarseGrainedSchedulerBackend-DriverEndpoint.md#executorsPendingLossReason[lost].

[[builtin-implementations]] .Built-In CoarseGrainedSchedulerBackends per Cluster Environment [cols="1,2",options="header",width="100%"] |=== | Cluster Environment | CoarseGrainedSchedulerBackend

| Spark Standalone | spark-standalone-StandaloneSchedulerBackend.md[StandaloneSchedulerBackend]

| Spark on YARN | yarn/spark-yarn-yarnschedulerbackend.md[YarnSchedulerBackend]

| Spark on Mesos | spark-mesos/spark-mesos-MesosCoarseGrainedSchedulerBackend.md[MesosCoarseGrainedSchedulerBackend] |===

NOTE: CoarseGrainedSchedulerBackend is only created indirectly through <>.

[[internal-properties]] .CoarseGrainedSchedulerBackend's Internal Properties [cols="1,1,2",options="header",width="100%"] |=== | Name | Initial Value | Description

[[currentExecutorIdCounter]] currentExecutorIdCounter
The last (highest) identifier of all <>.

Used exclusively in yarn/spark-yarn-cluster-YarnSchedulerEndpoint.md#RetrieveLastAllocatedExecutorId[YarnSchedulerEndpoint to respond to RetrieveLastAllocatedExecutorId message].

| [[createTime]] createTime | Current time | The time <CoarseGrainedSchedulerBackend was created>>.

| [[defaultAskTimeout]] defaultAskTimeout | rpc:index.md#spark.rpc.askTimeout[spark.rpc.askTimeout] or rpc:index.md#spark.network.timeout[spark.network.timeout] or 120s | Default timeout for blocking RPC messages (aka ask messages).

| [[driverEndpoint]] driverEndpoint | (uninitialized) a| rpc:RpcEndpointRef.md[RPC endpoint reference] to CoarseGrainedScheduler RPC endpoint (with scheduler:CoarseGrainedSchedulerBackend-DriverEndpoint.md[DriverEndpoint] as the message handler).

Initialized when CoarseGrainedSchedulerBackend <>.

Used when CoarseGrainedSchedulerBackend executes the following (asynchronously, i.e. on a separate thread):

  • <>
  • <>
  • <>
  • <>
  • <>
  • <>

| [[executorDataMap]] executorDataMap | empty | Registry of ExecutorData by executor id.

NOTE: ExecutorData holds an executor's endpoint reference, address, host, the number of free and total CPU cores, the URL of execution logs.

Element added when scheduler:CoarseGrainedSchedulerBackend-DriverEndpoint.md#RegisterExecutor[DriverEndpoint receives RegisterExecutor message] and removed when scheduler:CoarseGrainedSchedulerBackend-DriverEndpoint.md#RemoveExecutor[DriverEndpoint receives RemoveExecutor message] or scheduler:CoarseGrainedSchedulerBackend-DriverEndpoint.md#onDisconnected[a remote host (with one or many executors) disconnects].

| [[executorsPendingToRemove]] executorsPendingToRemove | empty | Executors marked as removed but the confirmation from a cluster manager has not arrived yet.

| [[hostToLocalTaskCount]] hostToLocalTaskCount | empty | Registry of hostnames and possible number of task running on them.

| [[localityAwareTasks]] localityAwareTasks | 0 | Number of pending tasks...FIXME

| [[maxRegisteredWaitingTimeMs]] maxRegisteredWaitingTimeMs | <> |

| [[maxRpcMessageSize]] maxRpcMessageSize | <> but not greater than 2047 a| Maximum RPC message size in MB.

When above 2047 MB you should see the following IllegalArgumentException:

spark.rpc.message.maxSize should not be greater than 2047 MB

| [[_minRegisteredRatio]] _minRegisteredRatio | <> |

| [[numPendingExecutors]] numPendingExecutors | 0 |

| [[totalCoreCount]] totalCoreCount | 0 | Total number of CPU cores, i.e. the sum of all the cores on all executors.

| [[totalRegisteredExecutors]] totalRegisteredExecutors | 0 | Total number of registered executors |===

[TIP]

Enable INFO or DEBUG logging level for org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend=DEBUG

Refer to spark-logging.md[Logging].

== [[killExecutorsOnHost]] Killing All Executors on Node -- killExecutorsOnHost Method

CAUTION: FIXME

== [[makeOffers]] Making Fake Resource Offers on Executors -- makeOffers Internal Methods

[source, scala]

makeOffers(): Unit makeOffers(executorId: String): Unit


makeOffers takes the active executors (out of the <> internal registry) and creates WorkerOffer resource offers for each (one per executor with the executor's id, host and free cores).

CAUTION: Only free cores are considered in making offers. Memory is not! Why?!

It then requests scheduler:TaskSchedulerImpl.md#resourceOffers[TaskSchedulerImpl to process the resource offers] to create a collection of spark-scheduler-TaskDescription.md[TaskDescription] collections that it in turn uses to scheduler:CoarseGrainedSchedulerBackend-DriverEndpoint.md#launchTasks[launch tasks].

== [[creating-instance]] Creating CoarseGrainedSchedulerBackend Instance

CoarseGrainedSchedulerBackend takes the following when created:

. [[scheduler]] scheduler:TaskSchedulerImpl.md[TaskSchedulerImpl] . [[rpcEnv]] rpc:index.md[RpcEnv]

CoarseGrainedSchedulerBackend initializes the <>.

== [[getExecutorIds]] Getting Executor Ids -- getExecutorIds Method

When called, getExecutorIds simply returns executor ids from the internal <> registry.

NOTE: It is called when ROOT:SparkContext.md#getExecutorIds[SparkContext calculates executor ids].

== [[contract]] CoarseGrainedSchedulerBackend Contract

[source, scala]

class CoarseGrainedSchedulerBackend { def minRegisteredRatio: Double def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint def reset(): Unit def sufficientResourcesRegistered(): Boolean def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] def doKillExecutors(executorIds: Seq[String]): Future[Boolean] }


NOTE: CoarseGrainedSchedulerBackend is a private[spark] contract.

.FIXME Contract [cols="1,2",options="header",width="100%"] |=== | Method | Description

| [[minRegisteredRatio]] minRegisteredRatio | Ratio between 0 and 1 (inclusive).

Controlled by <>.

| <> | FIXME

| [[doRequestTotalExecutors]] doRequestTotalExecutors | FIXME

| [[doKillExecutors]] doKillExecutors | FIXME

| [[sufficientResourcesRegistered]] sufficientResourcesRegistered | Always positive, i.e. true, that means that sufficient resources are available.

Used when CoarseGrainedSchedulerBackend <>. |===

  • It can <>.

== [[numExistingExecutors]] numExistingExecutors Method

CAUTION: FIXME

== [[killExecutors]] killExecutors Methods

CAUTION: FIXME

== [[getDriverLogUrls]] getDriverLogUrls Method

CAUTION: FIXME

== [[applicationAttemptId]] applicationAttemptId Method

CAUTION: FIXME

== [[requestExecutors]] Requesting Additional Executors -- requestExecutors Method

[source, scala]

requestExecutors(numAdditionalExecutors: Int): Boolean

requestExecutors is a "decorator" method that ultimately calls a cluster-specific <> method and returns whether the request was acknowledged or not (it is assumed false by default).

NOTE: requestExecutors method is part of spark-service-ExecutorAllocationClient.md[ExecutorAllocationClient Contract] that ROOT:SparkContext.md#requestExecutors[SparkContext uses for requesting additional executors] (as a part of a developer API for dynamic allocation of executors).

When called, you should see the following INFO message followed by DEBUG message in the logs:

INFO Requesting [numAdditionalExecutors] additional executor(s) from the cluster manager
DEBUG Number of pending executors is now [numPendingExecutors]

<> is increased by the input numAdditionalExecutors.

requestExecutors <> (that reflects the current computation needs). The "new executor total" is a sum of the internal <> and <> decreased by the <>.

If numAdditionalExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of additional executor(s) [numAdditionalExecutors] from the cluster manager. Please specify a positive number!

NOTE: It is a final method that no other scheduler backends could customize further.

NOTE: The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

== [[requestTotalExecutors]] Requesting Exact Number of Executors -- requestTotalExecutors Method

[source, scala]

requestTotalExecutors( numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int]): Boolean


requestTotalExecutors is a "decorator" method that ultimately calls a cluster-specific <> method and returns whether the request was acknowledged or not (it is assumed false by default).

NOTE: requestTotalExecutors is part of spark-service-ExecutorAllocationClient.md[ExecutorAllocationClient Contract] that ROOT:SparkContext.md#requestTotalExecutors[SparkContext uses for requesting the exact number of executors].

It sets the internal <> and <> registries. It then calculates the exact number of executors which is the input numExecutors and the <> decreased by the number of <>.

If numExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of executor(s) [numExecutors] from the cluster manager. Please specify a positive number!

NOTE: It is a final method that no other scheduler backends could customize further.

NOTE: The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

== [[defaultParallelism]] Finding Default Level of Parallelism -- defaultParallelism Method

[source, scala]

defaultParallelism(): Int

NOTE: defaultParallelism is part of the scheduler:SchedulerBackend.md#contract[SchedulerBackend Contract].

defaultParallelism is ROOT:configuration-properties.md#spark.default.parallelism[spark.default.parallelism] configuration property if defined.

Otherwise, defaultParallelism is the maximum of <> or 2.

== [[killTask]] Killing Task -- killTask Method

[source, scala]

killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit

NOTE: killTask is part of the scheduler:SchedulerBackend.md#killTask[SchedulerBackend contract].

killTask simply sends a scheduler:CoarseGrainedSchedulerBackend-DriverEndpoint.md#KillTask[KillTask] message to <>.

CAUTION: FIXME Image

== [[stopExecutors]] Stopping All Executors -- stopExecutors Method

stopExecutors sends a blocking <> message to <> (if already initialized).

NOTE: It is called exclusively while CoarseGrainedSchedulerBackend is <>.

You should see the following INFO message in the logs:

INFO CoarseGrainedSchedulerBackend: Shutting down all executors

== [[reset]] Reset State -- reset Method

reset resets the internal state:

  1. Sets <> to 0
  2. Clears executorsPendingToRemove
  3. Sends a blocking <> message to <> for every executor (in the internal executorDataMap) to inform it about SlaveLost with the message: +
    Stale executor after cluster manager re-registered.
    

reset is a method that is defined in CoarseGrainedSchedulerBackend, but used and overriden exclusively by yarn/spark-yarn-yarnschedulerbackend.md[YarnSchedulerBackend].

== [[removeExecutor]] Remove Executor -- removeExecutor Method

[source, scala]

removeExecutor(executorId: String, reason: ExecutorLossReason)

removeExecutor sends a blocking <> message to <>.

NOTE: It is called by subclasses spark-standalone.md#SparkDeploySchedulerBackend[SparkDeploySchedulerBackend], spark-mesos/spark-mesos.md#CoarseMesosSchedulerBackend[CoarseMesosSchedulerBackend], and yarn/spark-yarn-yarnschedulerbackend.md[YarnSchedulerBackend].

== [[CoarseGrainedScheduler]] CoarseGrainedScheduler RPC Endpoint -- driverEndpoint

When <>, it registers CoarseGrainedScheduler RPC endpoint to be the driver's communication endpoint.

driverEndpoint is a scheduler:CoarseGrainedSchedulerBackend-DriverEndpoint.md[DriverEndpoint].

NOTE: CoarseGrainedSchedulerBackend is created while ROOT:SparkContext.md#createTaskScheduler[SparkContext is being created] that in turn lives inside a spark-driver.md[Spark driver]. That explains the name driverEndpoint (at least partially).

It is called standalone scheduler's driver endpoint internally.

It tracks:

It uses driver-revive-thread daemon single-thread thread pool for ...FIXME

CAUTION: FIXME A potential issue with driverEndpoint.asInstanceOf[NettyRpcEndpointRef].toURI - doubles spark:// prefix.

== [[start]] Starting CoarseGrainedSchedulerBackend (and Registering CoarseGrainedScheduler RPC Endpoint) -- start Method

[source, scala]

start(): Unit

NOTE: start is part of the scheduler:SchedulerBackend.md#contract[SchedulerBackend contract].

start takes all spark.-prefixed properties and registers the <CoarseGrainedScheduler RPC endpoint>> (backed by scheduler:CoarseGrainedSchedulerBackend-DriverEndpoint.md[DriverEndpoint ThreadSafeRpcEndpoint]).

.CoarseGrainedScheduler Endpoint image::CoarseGrainedScheduler-rpc-endpoint.png[align="center"]

NOTE: start uses <> to access the current ROOT:SparkContext.md[SparkContext] and in turn ROOT:SparkConf.md[SparkConf].

NOTE: start uses <> that was given when <CoarseGrainedSchedulerBackend was created>>.

== [[isReady]] Checking If Sufficient Compute Resources Available Or Waiting Time Passed -- isReady Method

[source, scala]

isReady(): Boolean

NOTE: isReady is part of the scheduler:SchedulerBackend.md#contract[SchedulerBackend contract].

isReady allows to delay task launching until <> or <> passes.

Internally, isReady <>.

NOTE: <> by default responds that sufficient resources are available.

If the <>, you should see the following INFO message in the logs and isReady is positive.

[options="wrap"]

INFO SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: [minRegisteredRatio]

NOTE: <> is in the range 0 to 1 (uses <>) to denote the minimum ratio of registered resources to total expected resources before submitting tasks.

If there are no sufficient resources available yet (the above requirement does not hold), isReady checks whether the time since <> passed <> to give a way to launch tasks (even when <> not being reached yet).

You should see the following INFO message in the logs and isReady is positive.

[options="wrap"]

INFO SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: maxRegisteredWaitingTimeMs

Otherwise, when <> and <> has not elapsed, isReady is negative.

== [[reviveOffers]] Reviving Resource Offers (by Posting ReviveOffers to CoarseGrainedSchedulerBackend RPC Endpoint) -- reviveOffers Method

[source, scala]

reviveOffers(): Unit

NOTE: reviveOffers is part of the scheduler:SchedulerBackend.md#reviveOffers[SchedulerBackend contract].

reviveOffers simply sends a scheduler:CoarseGrainedSchedulerBackend-DriverEndpoint.md#ReviveOffers[ReviveOffers] message to <CoarseGrainedSchedulerBackend RPC endpoint>>.

.CoarseGrainedExecutorBackend Revives Offers image::CoarseGrainedExecutorBackend-reviveOffers.png[align="center"]

== [[stop]] Stopping CoarseGrainedSchedulerBackend (and Stopping Executors) -- stop Method

[source, scala]

stop(): Unit

NOTE: stop is part of the scheduler:SchedulerBackend.md#contract[SchedulerBackend contract].

stop <> and <CoarseGrainedScheduler RPC endpoint>> (by sending a blocking scheduler:CoarseGrainedSchedulerBackend-DriverEndpoint.md#StopDriver[StopDriver] message).

In case of any Exception, stop reports a SparkException with the message:

Error stopping standalone scheduler's driver endpoint

== [[createDriverEndpointRef]] createDriverEndpointRef Method

[source, scala]

createDriverEndpointRef( properties: ArrayBuffer[(String, String)]): RpcEndpointRef


createDriverEndpointRef <> and rpc:index.md#setupEndpoint[registers it] as CoarseGrainedScheduler.

createDriverEndpointRef is used when CoarseGrainedSchedulerBackend is requested to <>.

== [[createDriverEndpoint]] Creating DriverEndpoint -- createDriverEndpoint Method

[source, scala]

createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint

createDriverEndpoint simply creates a scheduler:CoarseGrainedSchedulerBackend-DriverEndpoint.md#creating-instance[DriverEndpoint].

NOTE: scheduler:CoarseGrainedSchedulerBackend-DriverEndpoint.md[DriverEndpoint] is the <CoarseGrainedSchedulerBackend>>.

NOTE: The purpose of createDriverEndpoint is to allow YARN to use the custom YarnDriverEndpoint.

NOTE: createDriverEndpoint is used when CoarseGrainedSchedulerBackend <>.


Last update: 2020-10-10