HeartbeatReceiver RPC Endpoint¶
HeartbeatReceiver is a ThreadSafeRpcEndpoint that is registered on the driver as HeartbeatReceiver.
HeartbeatReceiver receives Heartbeat messages from executors for accumulator updates (with task metrics and a Spark application's accumulators) and pass them along to TaskScheduler.

HeartbeatReceiver is registered immediately after a Spark application is started (i.e. when SparkContext is created).
HeartbeatReceiver is a SparkListener to get notified about new executors or executors that are no longer available.
Creating Instance¶
HeartbeatReceiver takes the following to be created:
- SparkContext
-
Clock(default:SystemClock)
HeartbeatReceiver is created when SparkContext is created
TaskScheduler¶
HeartbeatReceiver manages a reference to TaskScheduler.
RPC Messages¶
ExecutorRemoved¶
Attributes:
- Executor ID
Posted when HeartbeatReceiver is notified that an executor is no longer available
When received, HeartbeatReceiver removes the executor (from executorLastSeen internal registry).
ExecutorRegistered¶
Attributes:
- Executor ID
Posted when HeartbeatReceiver is notified that a new executor has been registered
When received, HeartbeatReceiver registers the executor and the current time (in executorLastSeen internal registry).
ExpireDeadHosts¶
No attributes
When received, HeartbeatReceiver prints out the following TRACE message to the logs:
Checking for hosts with no recent heartbeats in HeartbeatReceiver.
Each executor (in executorLastSeen internal registry) is checked whether the time it was last seen is not past spark.network.timeout.
For any such executor, HeartbeatReceiver prints out the following WARN message to the logs:
Removing executor [executorId] with no recent heartbeats: [time] ms exceeds timeout [timeout] ms
HeartbeatReceiver TaskScheduler.executorLost (with SlaveLost("Executor heartbeat timed out after [timeout] ms").
SparkContext.killAndReplaceExecutor is asynchronously called for the executor (i.e. on killExecutorThread).
The executor is removed from the executorLastSeen internal registry.
Heartbeat¶
Attributes:
- Executor ID
- AccumulatorV2 updates (by task ID)
- BlockManagerId
ExecutorMetricspeaks (by stage and stage attempt IDs)
Posted when Executor informs that it is alive and reports task metrics.
When received, HeartbeatReceiver finds the executorId executor (in executorLastSeen internal registry).
When the executor is found, HeartbeatReceiver updates the time the heartbeat was received (in executorLastSeen internal registry).
HeartbeatReceiver uses the Clock to know the current time.
HeartbeatReceiver then submits an asynchronous task to notify TaskScheduler that the heartbeat was received from the executor (using TaskScheduler internal reference). HeartbeatReceiver posts a HeartbeatResponse back to the executor (with the response from TaskScheduler whether the executor has been registered already or not so it may eventually need to re-register).
If however the executor was not found (in executorLastSeen internal registry), i.e. the executor was not registered before, you should see the following DEBUG message in the logs and the response is to notify the executor to re-register.
Received heartbeat from unknown executor [executorId]
In a very rare case, when TaskScheduler is not yet assigned to HeartbeatReceiver, you should see the following WARN message in the logs and the response is to notify the executor to re-register.
Dropping [heartbeat] because TaskScheduler is not ready yet
TaskSchedulerIsSet¶
No attributes
Posted when SparkContext informs that TaskScheduler is available.
When received, HeartbeatReceiver sets the internal reference to TaskScheduler.
onExecutorAdded¶
onExecutorAdded(
executorAdded: SparkListenerExecutorAdded): Unit
onExecutorAdded sends an ExecutorRegistered message to itself.
onExecutorAdded is part of the SparkListener abstraction.
addExecutor¶
addExecutor(
executorId: String): Option[Future[Boolean]]
addExecutor...FIXME
onExecutorRemoved¶
onExecutorRemoved(
executorRemoved: SparkListenerExecutorRemoved): Unit
onExecutorRemoved removes the executor.
onExecutorRemoved is part of the SparkListener abstraction.
removeExecutor¶
removeExecutor(
executorId: String): Option[Future[Boolean]]
removeExecutor...FIXME
Starting HeartbeatReceiver¶
onStart(): Unit
onStart sends a blocking ExpireDeadHosts every spark.network.timeoutInterval on eventLoopThread.
onStart is part of the RpcEndpoint abstraction.
Stopping HeartbeatReceiver¶
onStop(): Unit
onStop shuts down the eventLoopThread and killExecutorThread thread pools.
onStop is part of the RpcEndpoint abstraction.
Handling Two-Way Messages¶
receiveAndReply(
context: RpcCallContext): PartialFunction[Any, Unit]
receiveAndReply...FIXME
receiveAndReply is part of the RpcEndpoint abstraction.
Thread Pools¶
kill-executor-thread¶
killExecutorThread is a daemon ScheduledThreadPoolExecutor with a single thread.
The name of the thread pool is kill-executor-thread.
heartbeat-receiver-event-loop-thread¶
eventLoopThread is a daemon ScheduledThreadPoolExecutor with a single thread.
The name of the thread pool is heartbeat-receiver-event-loop-thread.
Expiring Dead Hosts¶
expireDeadHosts(): Unit
expireDeadHosts...FIXME
expireDeadHosts is used when HeartbeatReceiver is requested to receives an ExpireDeadHosts message.
Logging¶
Enable ALL logging level for org.apache.spark.HeartbeatReceiver logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.HeartbeatReceiver=ALL
Refer to Logging.