HeartbeatReceiver RPC Endpoint¶
HeartbeatReceiver
is a ThreadSafeRpcEndpoint that is registered on the driver as HeartbeatReceiver.
HeartbeatReceiver
receives Heartbeat messages from executors for accumulator updates (with task metrics and a Spark application's accumulators) and pass them along to TaskScheduler.
HeartbeatReceiver
is registered immediately after a Spark application is started (i.e. when SparkContext is created).
HeartbeatReceiver
is a SparkListener to get notified about new executors or executors that are no longer available.
Creating Instance¶
HeartbeatReceiver
takes the following to be created:
- SparkContext
-
Clock
(default:SystemClock
)
HeartbeatReceiver
is created when SparkContext
is created
TaskScheduler¶
HeartbeatReceiver
manages a reference to TaskScheduler.
RPC Messages¶
ExecutorRemoved¶
Attributes:
- Executor ID
Posted when HeartbeatReceiver
is notified that an executor is no longer available
When received, HeartbeatReceiver
removes the executor (from executorLastSeen internal registry).
ExecutorRegistered¶
Attributes:
- Executor ID
Posted when HeartbeatReceiver
is notified that a new executor has been registered
When received, HeartbeatReceiver
registers the executor and the current time (in executorLastSeen internal registry).
ExpireDeadHosts¶
No attributes
When received, HeartbeatReceiver
prints out the following TRACE message to the logs:
Checking for hosts with no recent heartbeats in HeartbeatReceiver.
Each executor (in executorLastSeen internal registry) is checked whether the time it was last seen is not past spark.network.timeout.
For any such executor, HeartbeatReceiver
prints out the following WARN message to the logs:
Removing executor [executorId] with no recent heartbeats: [time] ms exceeds timeout [timeout] ms
HeartbeatReceiver
TaskScheduler.executorLost (with SlaveLost("Executor heartbeat timed out after [timeout] ms"
).
SparkContext.killAndReplaceExecutor
is asynchronously called for the executor (i.e. on killExecutorThread).
The executor is removed from the executorLastSeen internal registry.
Heartbeat¶
Attributes:
- Executor ID
- AccumulatorV2 updates (by task ID)
- BlockManagerId
ExecutorMetrics
peaks (by stage and stage attempt IDs)
Posted when Executor
informs that it is alive and reports task metrics.
When received, HeartbeatReceiver
finds the executorId
executor (in executorLastSeen internal registry).
When the executor is found, HeartbeatReceiver
updates the time the heartbeat was received (in executorLastSeen internal registry).
HeartbeatReceiver
uses the Clock to know the current time.
HeartbeatReceiver
then submits an asynchronous task to notify TaskScheduler
that the heartbeat was received from the executor (using TaskScheduler internal reference). HeartbeatReceiver
posts a HeartbeatResponse
back to the executor (with the response from TaskScheduler
whether the executor has been registered already or not so it may eventually need to re-register).
If however the executor was not found (in executorLastSeen internal registry), i.e. the executor was not registered before, you should see the following DEBUG message in the logs and the response is to notify the executor to re-register.
Received heartbeat from unknown executor [executorId]
In a very rare case, when TaskScheduler is not yet assigned to HeartbeatReceiver
, you should see the following WARN message in the logs and the response is to notify the executor to re-register.
Dropping [heartbeat] because TaskScheduler is not ready yet
TaskSchedulerIsSet¶
No attributes
Posted when SparkContext
informs that TaskScheduler
is available.
When received, HeartbeatReceiver
sets the internal reference to TaskScheduler
.
onExecutorAdded¶
onExecutorAdded(
executorAdded: SparkListenerExecutorAdded): Unit
onExecutorAdded
sends an ExecutorRegistered message to itself.
onExecutorAdded
is part of the SparkListener abstraction.
addExecutor¶
addExecutor(
executorId: String): Option[Future[Boolean]]
addExecutor
...FIXME
onExecutorRemoved¶
onExecutorRemoved(
executorRemoved: SparkListenerExecutorRemoved): Unit
onExecutorRemoved
removes the executor.
onExecutorRemoved
is part of the SparkListener abstraction.
removeExecutor¶
removeExecutor(
executorId: String): Option[Future[Boolean]]
removeExecutor
...FIXME
Starting HeartbeatReceiver¶
onStart(): Unit
onStart
sends a blocking ExpireDeadHosts every spark.network.timeoutInterval on eventLoopThread.
onStart
is part of the RpcEndpoint abstraction.
Stopping HeartbeatReceiver¶
onStop(): Unit
onStop
shuts down the eventLoopThread and killExecutorThread thread pools.
onStop
is part of the RpcEndpoint abstraction.
Handling Two-Way Messages¶
receiveAndReply(
context: RpcCallContext): PartialFunction[Any, Unit]
receiveAndReply
...FIXME
receiveAndReply
is part of the RpcEndpoint abstraction.
Thread Pools¶
kill-executor-thread¶
killExecutorThread
is a daemon ScheduledThreadPoolExecutor with a single thread.
The name of the thread pool is kill-executor-thread.
heartbeat-receiver-event-loop-thread¶
eventLoopThread
is a daemon ScheduledThreadPoolExecutor with a single thread.
The name of the thread pool is heartbeat-receiver-event-loop-thread.
Expiring Dead Hosts¶
expireDeadHosts(): Unit
expireDeadHosts
...FIXME
expireDeadHosts
is used when HeartbeatReceiver
is requested to receives an ExpireDeadHosts message.
Logging¶
Enable ALL
logging level for org.apache.spark.HeartbeatReceiver
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.HeartbeatReceiver=ALL
Refer to Logging.