Skip to content

HeartbeatReceiver RPC Endpoint

HeartbeatReceiver is a ThreadSafeRpcEndpoint that is registered on the driver as HeartbeatReceiver.

HeartbeatReceiver receives Heartbeat messages from executors for accumulator updates (with task metrics and a Spark application's accumulators) and pass them along to TaskScheduler.

HeartbeatReceiver RPC Endpoint and Heartbeats from Executors

HeartbeatReceiver is registered immediately after a Spark application is started (i.e. when SparkContext is created).

HeartbeatReceiver is a SparkListener to get notified about new executors or executors that are no longer available.

Creating Instance

HeartbeatReceiver takes the following to be created:

HeartbeatReceiver is created when SparkContext is created

TaskScheduler

HeartbeatReceiver manages a reference to TaskScheduler.

RPC Messages

ExecutorRemoved

Attributes:

  • Executor ID

Posted when HeartbeatReceiver is notified that an executor is no longer available

When received, HeartbeatReceiver removes the executor (from executorLastSeen internal registry).

ExecutorRegistered

Attributes:

  • Executor ID

Posted when HeartbeatReceiver is notified that a new executor has been registered

When received, HeartbeatReceiver registers the executor and the current time (in executorLastSeen internal registry).

ExpireDeadHosts

No attributes

When received, HeartbeatReceiver prints out the following TRACE message to the logs:

Checking for hosts with no recent heartbeats in HeartbeatReceiver.

Each executor (in executorLastSeen internal registry) is checked whether the time it was last seen is not past spark.network.timeout.

For any such executor, HeartbeatReceiver prints out the following WARN message to the logs:

Removing executor [executorId] with no recent heartbeats: [time] ms exceeds timeout [timeout] ms

HeartbeatReceiver TaskScheduler.executorLost (with SlaveLost("Executor heartbeat timed out after [timeout] ms").

SparkContext.killAndReplaceExecutor is asynchronously called for the executor (i.e. on killExecutorThread).

The executor is removed from the executorLastSeen internal registry.

Heartbeat

Attributes:

Posted when Executor informs that it is alive and reports task metrics.

When received, HeartbeatReceiver finds the executorId executor (in executorLastSeen internal registry).

When the executor is found, HeartbeatReceiver updates the time the heartbeat was received (in executorLastSeen internal registry).

HeartbeatReceiver uses the Clock to know the current time.

HeartbeatReceiver then submits an asynchronous task to notify TaskScheduler that the heartbeat was received from the executor (using TaskScheduler internal reference). HeartbeatReceiver posts a HeartbeatResponse back to the executor (with the response from TaskScheduler whether the executor has been registered already or not so it may eventually need to re-register).

If however the executor was not found (in executorLastSeen internal registry), i.e. the executor was not registered before, you should see the following DEBUG message in the logs and the response is to notify the executor to re-register.

Received heartbeat from unknown executor [executorId]

In a very rare case, when TaskScheduler is not yet assigned to HeartbeatReceiver, you should see the following WARN message in the logs and the response is to notify the executor to re-register.

Dropping [heartbeat] because TaskScheduler is not ready yet

TaskSchedulerIsSet

No attributes

Posted when SparkContext informs that TaskScheduler is available.

When received, HeartbeatReceiver sets the internal reference to TaskScheduler.

onExecutorAdded

onExecutorAdded(
  executorAdded: SparkListenerExecutorAdded): Unit

onExecutorAdded sends an ExecutorRegistered message to itself.

onExecutorAdded is part of the SparkListener abstraction.

addExecutor

addExecutor(
  executorId: String): Option[Future[Boolean]]

addExecutor...FIXME

onExecutorRemoved

onExecutorRemoved(
  executorRemoved: SparkListenerExecutorRemoved): Unit

onExecutorRemoved removes the executor.

onExecutorRemoved is part of the SparkListener abstraction.

removeExecutor

removeExecutor(
  executorId: String): Option[Future[Boolean]]

removeExecutor...FIXME

Starting HeartbeatReceiver

onStart(): Unit

onStart sends a blocking ExpireDeadHosts every spark.network.timeoutInterval on eventLoopThread.

onStart is part of the RpcEndpoint abstraction.

Stopping HeartbeatReceiver

onStop(): Unit

onStop shuts down the eventLoopThread and killExecutorThread thread pools.

onStop is part of the RpcEndpoint abstraction.

Handling Two-Way Messages

receiveAndReply(
  context: RpcCallContext): PartialFunction[Any, Unit]

receiveAndReply...FIXME

receiveAndReply is part of the RpcEndpoint abstraction.

Thread Pools

kill-executor-thread

killExecutorThread is a daemon ScheduledThreadPoolExecutor with a single thread.

The name of the thread pool is kill-executor-thread.

heartbeat-receiver-event-loop-thread

eventLoopThread is a daemon ScheduledThreadPoolExecutor with a single thread.

The name of the thread pool is heartbeat-receiver-event-loop-thread.

Expiring Dead Hosts

expireDeadHosts(): Unit

expireDeadHosts...FIXME

expireDeadHosts is used when HeartbeatReceiver is requested to receives an ExpireDeadHosts message.

Logging

Enable ALL logging level for org.apache.spark.HeartbeatReceiver logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.HeartbeatReceiver=ALL

Refer to Logging.


Last update: 2020-11-27