Skip to content

DriverEndpoint

DriverEndpoint is a ThreadSafeRpcEndpoint that is a message handler for CoarseGrainedSchedulerBackend to communicate with CoarseGrainedExecutorBackend.

CoarseGrainedSchedulerBackend uses DriverEndpoint for communication with CoarseGrainedExecutorBackend

DriverEndpoint uses executorDataMap internal registry of all the executors that registered with the driver. An executor sends a RegisterExecutor message to inform that it wants to register.

Executor registration (RegisterExecutor RPC message flow)

Creating Instance

DriverEndpoint takes no arguments to be created.

DriverEndpoint is created when CoarseGrainedSchedulerBackend is requested for one.

ExecutorLogUrlHandler

logUrlHandler: ExecutorLogUrlHandler

DriverEndpoint creates an ExecutorLogUrlHandler (based on spark.ui.custom.executor.log.url configuration property) when created.

DriverEndpoint uses the ExecutorLogUrlHandler to create an ExecutorData when requested to handle a RegisterExecutor message.

Starting DriverEndpoint

onStart(): Unit

onStart is part of the RpcEndpoint abstraction.

onStart requests the Revive Messages Scheduler Service to schedule a periodic action that sends ReviveOffers messages every revive interval (based on spark.scheduler.revive.interval configuration property).

Messages

KillExecutorsOnHost

CoarseGrainedSchedulerBackend is requested to kill all executors on a node

KillTask

CoarseGrainedSchedulerBackend is requested to kill a task.

KillTask(
  taskId: Long,
  executor: String,
  interruptThread: Boolean)

KillTask is sent when CoarseGrainedSchedulerBackend kills a task.

When KillTask is received, DriverEndpoint finds executor (in executorDataMap registry).

If found, DriverEndpoint passes the message on to the executor (using its registered RPC endpoint for CoarseGrainedExecutorBackend).

Otherwise, you should see the following WARN in the logs:

Attempted to kill task [taskId] for unknown executor [executor].

LaunchedExecutor

RegisterExecutor

CoarseGrainedExecutorBackend registers with the driver

RegisterExecutor(
  executorId: String,
  executorRef: RpcEndpointRef,
  hostname: String,
  cores: Int,
  logUrls: Map[String, String])

RegisterExecutor is sent when CoarseGrainedExecutorBackend RPC Endpoint is requested to start.

Executor registration (RegisterExecutor RPC message flow)

When received, DriverEndpoint makes sure that no other executors were registered under the input executorId and that the input hostname is not blacklisted.

If the requirements hold, you should see the following INFO message in the logs:

Registered executor [executorRef] ([address]) with ID [executorId]

DriverEndpoint does the bookkeeping:

If numPendingExecutors is greater than 0, you should see the following DEBUG message in the logs and DriverEndpoint decrements numPendingExecutors.

Decremented number of pending executors ([numPendingExecutors] left)

DriverEndpoint sends RegisteredExecutor message back (that is to confirm that the executor was registered successfully).

DriverEndpoint replies true (to acknowledge the message).

DriverEndpoint then announces the new executor by posting SparkListenerExecutorAdded to LiveListenerBus.

In the end, DriverEndpoint makes executor resource offers (for launching tasks).

If however there was already another executor registered under the input executorId, DriverEndpoint sends RegisterExecutorFailed message back with the reason:

Duplicate executor ID: [executorId]

If however the input hostname is blacklisted, you should see the following INFO message in the logs:

Rejecting [executorId] as it has been blacklisted.

DriverEndpoint sends RegisterExecutorFailed message back with the reason:

Executor is blacklisted: [executorId]

RemoveExecutor

RemoveWorker

RetrieveSparkAppConfig

RetrieveSparkAppConfig(
  resourceProfileId: Int)

Posted when:

  • CoarseGrainedExecutorBackend standalone application is started

When received, DriverEndpoint replies with a SparkAppConfig message with the following:

  1. spark-prefixed configuration properties
  2. IO Encryption Key
  3. Delegation tokens
  4. Default profile

ReviveOffers

Posted when:

When received, DriverEndpoint makes executor resource offers.

StatusUpdate

CoarseGrainedExecutorBackend sends task status updates to the driver

StatusUpdate(
  executorId: String,
  taskId: Long,
  state: TaskState,
  data: SerializableBuffer)

StatusUpdate is sent when CoarseGrainedExecutorBackend sends task status updates to the driver.

When StatusUpdate is received, DriverEndpoint requests the TaskSchedulerImpl to handle the task status update.

If the task has finished, DriverEndpoint updates the number of cores available for work on the corresponding executor (registered in executorDataMap).

DriverEndpoint makes an executor resource offer on the single executor.

When DriverEndpoint found no executor (in executorDataMap), you should see the following WARN message in the logs:

Ignored task status update ([taskId] state [state]) from unknown executor with ID [executorId]

StopDriver

StopExecutors

StopExecutors message is receive-reply and blocking. When received, the following INFO message appears in the logs:

Asking each executor to shut down

It then sends a StopExecutor message to every registered executor (from executorDataMap).

UpdateDelegationTokens

Making Executor Resource Offers (for Launching Tasks)

makeOffers(): Unit

makeOffers creates WorkerOffers for all active executors.

makeOffers requests TaskSchedulerImpl to generate tasks for the available worker offers.

When there are tasks to be launched (from TaskSchedulerImpl) makeOffers does so.

makeOffers is used when DriverEndpoint handles ReviveOffers or RegisterExecutor messages.

Making Executor Resource Offer on Single Executor (for Launching Tasks)

makeOffers(
  executorId: String): Unit

makeOffers makes sure that the input executorId is alive.

NOTE: makeOffers does nothing when the input executorId is registered as pending to be removed or got lost.

makeOffers finds the executor data (in scheduler:CoarseGrainedSchedulerBackend.md#executorDataMap[executorDataMap] registry) and creates a scheduler:TaskSchedulerImpl.md#WorkerOffer[WorkerOffer].

NOTE: WorkerOffer represents a resource offer with CPU cores available on an executor.

makeOffers then scheduler:TaskSchedulerImpl.md#resourceOffers[requests TaskSchedulerImpl to generate tasks for the WorkerOffer] followed by launching the tasks (on the executor).

makeOffers is used when CoarseGrainedSchedulerBackend RPC endpoint (DriverEndpoint) handles a StatusUpdate message.

Launching Tasks

launchTasks(
  tasks: Seq[Seq[TaskDescription]]): Unit

Note

The input tasks collection contains one or more TaskDescriptions per executor (and the "task partitioning" per executor is of no use in launchTasks so it simply flattens the input data structure).

For every TaskDescription (in the given tasks collection), launchTasks encodes it and makes sure that the encoded task size is below the allowed message size.

launchTasks looks up the ExecutorData of the executor that has been assigned to execute the task (in executorDataMap internal registry) and decreases the executor's free cores (based on spark.task.cpus configuration property).

Note

Scheduling in Spark relies on cores only (not memory), i.e. the number of tasks Spark can run on an executor is limited by the number of cores available only. When submitting a Spark application for execution both executor resources -- memory and cores -- can however be specified explicitly. It is the job of a cluster manager to monitor the memory and take action when its use exceeds what was assigned.

launchTasks prints out the following DEBUG message to the logs:

Launching task [taskId] on executor id: [executorId] hostname: [executorHost].

In the end, launchTasks sends the (serialized) task to the executor (by sending a LaunchTask message to the executor's RPC endpoint with the serialized task insize SerializableBuffer).

Note

This is the moment in a task's lifecycle when the driver sends the serialized task to an assigned executor.

launchTasks is used when CoarseGrainedSchedulerBackend is requested to make resource offers on single or all executors.

Task Exceeds Allowed Size

In case the size of a serialized TaskDescription equals or exceeds the maximum allowed RPC message size, launchTasks looks up the TaskSetManager for the TaskDescription (in taskIdToTaskSetManager registry) and aborts it with the following message:

Serialized task [id]:[index] was [limit] bytes, which exceeds max allowed: spark.rpc.message.maxSize ([maxRpcMessageSize] bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.

Removing Executor

removeExecutor(
  executorId: String,
  reason: ExecutorLossReason): Unit

When removeExecutor is executed, you should see the following DEBUG message in the logs:

Asked to remove executor [executorId] with reason [reason]

removeExecutor then tries to find the executorId executor (in executorDataMap internal registry).

If the executorId executor was found, removeExecutor removes the executor from the following registries:

removeExecutor decrements:

In the end, removeExecutor notifies TaskSchedulerImpl that an executor was lost.

removeExecutor posts SparkListenerExecutorRemoved to LiveListenerBus (with the executorId executor).

If however the executorId executor could not be found, removeExecutor requests BlockManagerMaster to remove the executor asynchronously.

Note

removeExecutor uses SparkEnv to access the current BlockManager and then BlockManagerMaster.

You should see the following INFO message in the logs:

Asked to remove non-existent executor [executorId]

removeExecutor is used when DriverEndpoint handles RemoveExecutor message and gets disassociated with a remote RPC endpoint of an executor.

Removing Worker

removeWorker(
  workerId: String,
  host: String,
  message: String): Unit

removeWorker prints out the following DEBUG message to the logs:

Asked to remove worker [workerId] with reason [message]

In the end, removeWorker simply requests the TaskSchedulerImpl to workerRemoved.

removeWorker is used when DriverEndpoint is requested to handle a RemoveWorker event.

Processing One-Way Messages

receive: PartialFunction[Any, Unit]

receive is part of the RpcEndpoint abstraction.

receive...FIXME

Processing Two-Way Messages

receiveAndReply(
  context: RpcCallContext): PartialFunction[Any, Unit]

receiveAndReply is part of the RpcEndpoint abstraction.

receiveAndReply...FIXME

onDisconnected Callback

onDisconnected removes the worker from the internal addressToExecutorId registry (that effectively removes the worker from a cluster).

onDisconnected removes the executor with the reason being SlaveLost and message:

Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

Executors by RpcAddress Registry

addressToExecutorId: Map[RpcAddress, String]

Executor addresses (host and port) for executors.

Set when an executor connects to register itself.

Disabling Executor

disableExecutor(
  executorId: String): Boolean

disableExecutor checks whether the executor is active:

disableExecutor determines whether the executor should really be disabled (as active or registered in executorsPendingToRemove registry).

If the executor should be disabled, disableExecutor prints out the following INFO message to the logs and notifies the TaskSchedulerImpl that the executor is lost.

Disabling executor [executorId].

disableExecutor returns the indication whether the executor should have been disabled or not.

disableExecutor is used when:

  • KubernetesDriverEndpoint is requested to handle onDisconnected event
  • YarnDriverEndpoint is requested to handle onDisconnected event

Logging

Enable ALL logging level for org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint=ALL

Refer to Logging.


Last update: 2021-01-05