DriverEndpoint¶
DriverEndpoint
is a ThreadSafeRpcEndpoint that is a message handler for CoarseGrainedSchedulerBackend to communicate with CoarseGrainedExecutorBackend.
DriverEndpoint
is registered under the name CoarseGrainedScheduler by CoarseGrainedSchedulerBackend.
DriverEndpoint
uses executorDataMap internal registry of all the executors that registered with the driver. An executor sends a RegisterExecutor message to inform that it wants to register.
Creating Instance¶
DriverEndpoint
takes no arguments to be created.
DriverEndpoint
is created when:
CoarseGrainedSchedulerBackend
is created (and registers a CoarseGrainedScheduler RPC endpoint)
ExecutorLogUrlHandler¶
logUrlHandler: ExecutorLogUrlHandler
DriverEndpoint
creates an ExecutorLogUrlHandler (based on spark.ui.custom.executor.log.url configuration property) when created.
DriverEndpoint
uses the ExecutorLogUrlHandler
to create an ExecutorData when requested to handle a RegisterExecutor message.
Starting DriverEndpoint¶
onStart
requests the Revive Messages Scheduler Service to schedule a periodic action that sends ReviveOffers messages every revive interval (based on spark.scheduler.revive.interval configuration property).
Launching Tasks¶
There are two makeOffers
methods to launch tasks that differ by the number of active executor (from the executorDataMap registry) they work with:
On All Active Executors¶
makeOffers(): Unit
makeOffers
builds WorkerOffers for every active executor (in the executorDataMap registry) and requests the TaskSchedulerImpl to generate tasks for the available worker offers (that creates TaskDescriptions).
With tasks (TaskDescription
s) to be launched, makeOffers
launches them.
makeOffers
is used when:
DriverEndpoint
handles ReviveOffers messages
On Single Executor¶
makeOffers(
executorId: String): Unit
Note
makeOffers
with a single executor is makeOffers for all active executors for just one executor.
makeOffers
is used when:
DriverEndpoint
handles StatusUpdate and LaunchedExecutor messages
Launching Tasks¶
launchTasks(
tasks: Seq[Seq[TaskDescription]]): Unit
Note
The input tasks
collection contains one or more TaskDescriptions per executor (and the "task partitioning" per executor is of no use in launchTasks
so it simply flattens the input data structure).
For every TaskDescription (in the given tasks
collection), launchTasks
encodes it and makes sure that the encoded task size is below the allowed message size.
launchTasks
looks up the ExecutorData
of the executor that has been assigned to execute the task (in executorDataMap internal registry) and decreases the executor's free cores (based on spark.task.cpus configuration property).
Note
Scheduling in Spark relies on cores only (not memory), i.e. the number of tasks Spark can run on an executor is limited by the number of cores available only. When submitting a Spark application for execution both executor resources -- memory and cores -- can however be specified explicitly. It is the job of a cluster manager to monitor the memory and take action when its use exceeds what was assigned.
launchTasks
prints out the following DEBUG message to the logs:
Launching task [taskId] on executor id: [executorId] hostname: [executorHost].
In the end, launchTasks
sends the (serialized) task to the executor (by sending a LaunchTask message to the executor's RPC endpoint with the serialized task insize SerializableBuffer
).
Note
This is the moment in a task's lifecycle when the driver sends the serialized task to an assigned executor.
Task Exceeds Allowed Size¶
In case the size of a serialized TaskDescription
equals or exceeds the maximum allowed RPC message size, launchTasks
looks up the TaskSetManager for the TaskDescription
(in taskIdToTaskSetManager registry) and aborts it with the following message:
Serialized task [id]:[index] was [limit] bytes, which exceeds max allowed: spark.rpc.message.maxSize ([maxRpcMessageSize] bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
Messages¶
KillExecutorsOnHost¶
CoarseGrainedSchedulerBackend
is requested to kill all executors on a node
KillTask¶
CoarseGrainedSchedulerBackend
is requested to kill a task.
KillTask(
taskId: Long,
executor: String,
interruptThread: Boolean)
KillTask
is sent when CoarseGrainedSchedulerBackend
kills a task.
When KillTask
is received, DriverEndpoint
finds executor
(in executorDataMap registry).
If found, DriverEndpoint
passes the message on to the executor (using its registered RPC endpoint for CoarseGrainedExecutorBackend
).
Otherwise, you should see the following WARN in the logs:
Attempted to kill task [taskId] for unknown executor [executor].
LaunchedExecutor¶
RegisterExecutor¶
CoarseGrainedExecutorBackend
registers with the driver
RegisterExecutor(
executorId: String,
executorRef: RpcEndpointRef,
hostname: String,
cores: Int,
logUrls: Map[String, String])
RegisterExecutor
is sent when CoarseGrainedExecutorBackend
RPC Endpoint is requested to start.
When received, DriverEndpoint
makes sure that no other executors were registered under the input executorId
and that the input hostname
is not blacklisted.
If the requirements hold, you should see the following INFO message in the logs:
Registered executor [executorRef] ([address]) with ID [executorId]
DriverEndpoint
does the bookkeeping:
- Registers
executorId
(in addressToExecutorId) - Adds
cores
(in totalCoreCount) - Increments totalRegisteredExecutors
- Creates and registers
ExecutorData
forexecutorId
(in executorDataMap) - Updates currentExecutorIdCounter if the input
executorId
is greater than the current value.
If numPendingExecutors is greater than 0
, you should see the following DEBUG message in the logs and DriverEndpoint decrements numPendingExecutors
.
Decremented number of pending executors ([numPendingExecutors] left)
DriverEndpoint
sends RegisteredExecutor message back (that is to confirm that the executor was registered successfully).
DriverEndpoint
replies true
(to acknowledge the message).
DriverEndpoint
then announces the new executor by posting SparkListenerExecutorAdded to LiveListenerBus.
In the end, DriverEndpoint
makes executor resource offers (for launching tasks).
If however there was already another executor registered under the input executorId
, DriverEndpoint
sends RegisterExecutorFailed message back with the reason:
Duplicate executor ID: [executorId]
If however the input hostname
is blacklisted, you should see the following INFO message in the logs:
Rejecting [executorId] as it has been blacklisted.
DriverEndpoint
sends RegisterExecutorFailed message back with the reason:
Executor is blacklisted: [executorId]
RemoveExecutor¶
RemoveWorker¶
RetrieveSparkAppConfig¶
RetrieveSparkAppConfig(
resourceProfileId: Int)
Posted when:
CoarseGrainedExecutorBackend
standalone application is started
When received, DriverEndpoint
replies with a SparkAppConfig
message with the following:
spark
-prefixed configuration properties- IO Encryption Key
- Delegation tokens
- Default profile
ReviveOffers¶
Posted when:
- Periodically (every spark.scheduler.revive.interval) right after
DriverEndpoint
is requested to start CoarseGrainedSchedulerBackend
is requested to revive resource offers
When received, DriverEndpoint
makes executor resource offers.
StatusUpdate¶
CoarseGrainedExecutorBackend
sends task status updates to the driver
StatusUpdate(
executorId: String,
taskId: Long,
state: TaskState,
data: SerializableBuffer)
StatusUpdate
is sent when CoarseGrainedExecutorBackend
sends task status updates to the driver.
When StatusUpdate
is received, DriverEndpoint requests the TaskSchedulerImpl to handle the task status update.
If the task has finished, DriverEndpoint
updates the number of cores available for work on the corresponding executor (registered in executorDataMap).
DriverEndpoint makes an executor resource offer on the single executor.
When DriverEndpoint
found no executor (in executorDataMap), you should see the following WARN message in the logs:
Ignored task status update ([taskId] state [state]) from unknown executor with ID [executorId]
StopDriver¶
StopExecutors¶
StopExecutors
message is receive-reply and blocking. When received, the following INFO message appears in the logs:
Asking each executor to shut down
It then sends a StopExecutor message to every registered executor (from executorDataMap
).
UpdateDelegationTokens¶
Removing Executor¶
removeExecutor(
executorId: String,
reason: ExecutorLossReason): Unit
When removeExecutor
is executed, you should see the following DEBUG message in the logs:
Asked to remove executor [executorId] with reason [reason]
removeExecutor
then tries to find the executorId
executor (in executorDataMap internal registry).
If the executorId
executor was found, removeExecutor
removes the executor from the following registries:
removeExecutor
decrements:
- totalCoreCount by the executor's
totalCores
- totalRegisteredExecutors
In the end, removeExecutor
notifies TaskSchedulerImpl
that an executor was lost.
removeExecutor
posts SparkListenerExecutorRemoved to LiveListenerBus (with the executorId
executor).
If however the executorId
executor could not be found, removeExecutor
requests BlockManagerMaster
to remove the executor asynchronously.
Note
removeExecutor
uses SparkEnv
to access the current BlockManager
and then BlockManagerMaster.
You should see the following INFO message in the logs:
Asked to remove non-existent executor [executorId]
removeExecutor
is used when DriverEndpoint
handles RemoveExecutor message and gets disassociated with a remote RPC endpoint of an executor.
Removing Worker¶
removeWorker(
workerId: String,
host: String,
message: String): Unit
removeWorker
prints out the following DEBUG message to the logs:
Asked to remove worker [workerId] with reason [message]
In the end, removeWorker
simply requests the TaskSchedulerImpl to workerRemoved.
removeWorker
is used when DriverEndpoint
is requested to handle a RemoveWorker event.
Processing One-Way Messages¶
receive: PartialFunction[Any, Unit]
receive
is part of the RpcEndpoint abstraction.
receive
...FIXME
Processing Two-Way Messages¶
receiveAndReply(
context: RpcCallContext): PartialFunction[Any, Unit]
receiveAndReply
is part of the RpcEndpoint abstraction.
receiveAndReply
...FIXME
onDisconnected Callback¶
onDisconnected
removes the worker from the internal addressToExecutorId registry (that effectively removes the worker from a cluster).
onDisconnected
removes the executor with the reason being SlaveLost
and message:
Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Executors by RpcAddress Registry¶
addressToExecutorId: Map[RpcAddress, String]
Executor addresses (host and port) for executors.
Set when an executor connects to register itself.
Disabling Executor¶
disableExecutor(
executorId: String): Boolean
disableExecutor
checks whether the executor is active:
- If so,
disableExecutor
adds the executor to the executorsPendingLossReason registry - Otherwise,
disableExecutor
checks whether added to executorsPendingToRemove registry
disableExecutor
determines whether the executor should really be disabled (as active or registered in executorsPendingToRemove registry).
If the executor should be disabled, disableExecutor
prints out the following INFO message to the logs and notifies the TaskSchedulerImpl that the executor is lost.
Disabling executor [executorId].
disableExecutor
returns the indication whether the executor should have been disabled or not.
disableExecutor
is used when:
KubernetesDriverEndpoint
is requested to handleonDisconnected
eventYarnDriverEndpoint
is requested to handleonDisconnected
event
Logging¶
Enable ALL
logging level for org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint=ALL
Refer to Logging.