Skip to content

HeartbeatReceiver RPC Endpoint

[[ENDPOINT_NAME]] HeartbeatReceiver is a rpc:RpcEndpoint.md#ThreadSafeRpcEndpoint[ThreadSafeRpcEndpoint] registered on the driver under the name HeartbeatReceiver.

HeartbeatReceiver receives <> messages from executors that Spark uses as the mechanism to receive accumulator updates (with task metrics and a Spark application's accumulators) and scheduler:TaskScheduler.md#executorHeartbeatReceived[pass them along to TaskScheduler].

HeartbeatReceiver RPC Endpoint and Heartbeats from Executors

NOTE: HeartbeatReceiver is registered immediately after a Spark application is started, i.e. when SparkContext is created.

HeartbeatReceiver is a ROOT:SparkListener.md[] to get notified when <> to or <> in a Spark application. HeartbeatReceiver tracks executors (in <> registry) to handle <> and <> messages from executors that are assigned to the Spark application.

[[messages]] .HeartbeatReceiver RPC Endpoint's Messages (in alphabetical order) [width="100%",cols="1,2",options="header"] |=== | Message | Description

| <> | Posted when HeartbeatReceiver <> (to a Spark application).

| <> | Posted when HeartbeatReceiver <> (with a Spark application).

| <> | FIXME

| <> | Posted when Executor executor:Executor.md#reportHeartBeat[informs that it is alive and reports task metrics].

| <> | Posted when SparkContext informs that TaskScheduler is available. |===

[[internal-registries]] .HeartbeatReceiver's Internal Registries and Counters [cols="1,2",options="header",width="100%"] |=== | Name | Description

| [[executorLastSeen]] executorLastSeen | Executor ids and the timestamps of when the last heartbeat was received.

| [[scheduler]] scheduler | scheduler:TaskScheduler.md[TaskScheduler] |===

[TIP]

Enable DEBUG or TRACE logging levels for org.apache.spark.HeartbeatReceiver to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.HeartbeatReceiver=TRACE

Refer to spark-logging.md[Logging].

=== [[creating-instance]] Creating HeartbeatReceiver Instance

HeartbeatReceiver takes the following when created:

  • [[sc]] ROOT:SparkContext.md[]
  • [[clock]] Clock

HeartbeatReceiver ROOT:SparkContext.md#addSparkListener[registers itself as a SparkListener].

HeartbeatReceiver initializes the <>.

=== [[onStart]] Starting HeartbeatReceiver RPC Endpoint -- onStart Method

NOTE: onStart is part of the rpc:RpcEndpoint.md[RpcEndpoint Contract]

When called, HeartbeatReceiver sends a blocking <> every <> on <>.

=== [[ExecutorRegistered]] ExecutorRegistered

[source, scala]

ExecutorRegistered(executorId: String)

When received, HeartbeatReceiver registers the executorId executor and the current time (in <> internal registry).

NOTE: HeartbeatReceiver uses the internal <> to know the current time.

=== [[ExecutorRemoved]] ExecutorRemoved

[source, scala]

ExecutorRemoved(executorId: String)

When ExecutorRemoved arrives, HeartbeatReceiver removes executorId from <> internal registry.

=== [[ExpireDeadHosts]] ExpireDeadHosts

[source, scala]

ExpireDeadHosts

When ExpireDeadHosts arrives the following TRACE is printed out to the logs:

TRACE HeartbeatReceiver: Checking for hosts with no recent heartbeats in HeartbeatReceiver.

Each executor (in <> registry) is checked whether the time it was last seen is not longer than <>.

For any such executor, the following WARN message is printed out to the logs:

WARN HeartbeatReceiver: Removing executor [executorId] with no recent heartbeats: [time] ms exceeds timeout [timeout] ms

scheduler:TaskScheduler.md#executorLost[TaskScheduler.executorLost] is called (with SlaveLost("Executor heartbeat timed out after [timeout] ms").

SparkContext.killAndReplaceExecutor is asynchronously called for the executor (i.e. on <>).

The executor is removed from <>.

=== [[Heartbeat]] Heartbeat

[source, scala]

Heartbeat( executorId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId)


When received, HeartbeatReceiver finds the executorId executor (in <> registry).

When the executor is found, HeartbeatReceiver updates the time the heartbeat was received (in <>).

NOTE: HeartbeatReceiver uses the internal <> to know the current time.

HeartbeatReceiver then submits an asynchronous task to notify TaskScheduler that the scheduler:TaskScheduler.md#executorHeartbeatReceived[heartbeat was received from the executor] (using <> internal reference). HeartbeatReceiver posts a HeartbeatResponse back to the executor (with the response from TaskScheduler whether the executor has been registered already or not so it may eventually need to re-register).

If however the executor was not found (in <> registry), i.e. the executor was not registered before, you should see the following DEBUG message in the logs and the response is to notify the executor to re-register.

DEBUG Received heartbeat from unknown executor [executorId]

In a very rare case, when <> is not yet assigned to HeartbeatReceiver, you should see the following WARN message in the logs and the response is to notify the executor to re-register.

WARN Dropping [heartbeat] because TaskScheduler is not ready yet

NOTE: <> can be unassigned when no <> has not been received yet.

NOTE: Heartbeats messages are the mechanism of executor:Executor.md#heartbeats-and-active-task-metrics[executors to inform the Spark application that they are alive and update about the state of active tasks].

=== [[TaskSchedulerIsSet]] TaskSchedulerIsSet

[source, scala]

TaskSchedulerIsSet

When received, HeartbeatReceiver sets the internal reference to <>.

NOTE: HeartbeatReceiver uses <> that is given when HeartbeatReceiver <>.

=== [[onExecutorAdded]] onExecutorAdded Method

[source, scala]

onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit

onExecutorAdded simply <ExecutorRegistered message to itself>> (that in turn registers an executor).

NOTE: onExecutorAdded is part of ROOT:SparkListener.md#onExecutorAdded[SparkListener contract] to announce that a new executor was registered with a Spark application.

=== [[addExecutor]] Sending ExecutorRegistered Message to Itself -- addExecutor Internal Method

[source, scala]

addExecutor(executorId: String): Option[Future[Boolean]]

addExecutor sends a <> message (to register executorId executor).

NOTE: addExecutor is used when HeartbeatReceiver <>.

=== [[onExecutorRemoved]] onExecutorRemoved Method

[source, scala]

onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit

onExecutorRemoved simply passes the call to <> (that in turn unregisters an executor).

NOTE: onExecutorRemoved is part of ROOT:SparkListener.md#onExecutorRemoved[SparkListener contract] to announce that an executor is no longer available for a Spark application.

=== [[removeExecutor]] Sending ExecutorRemoved Message to Itself -- removeExecutor Method

[source, scala]

removeExecutor(executorId: String): Option[Future[Boolean]]

removeExecutor sends a <> message to itself (passing in executorId).

NOTE: removeExecutor is used when HeartbeatReceiver <>.

=== [[onStop]] Stopping HeartbeatReceiver RPC Endpoint -- onStop Method

NOTE: onStop is part of the rpc:index.md#RpcEndpoint[RpcEndpoint Contract]

When called, HeartbeatReceiver cancels the checking task (that sends a blocking <> every <> on <> - see <>) and shuts down <> and <> executors.

=== [[killExecutorThread]][[kill-executor-thread]] killExecutorThread -- Kill Executor Thread

killExecutorThread is a daemon https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html[ScheduledThreadPoolExecutor] with a single thread.

The name of the thread pool is kill-executor-thread.

NOTE: It is used to request SparkContext to kill the executor.

=== [[eventLoopThread]][[heartbeat-receiver-event-loop-thread]] eventLoopThread -- Heartbeat Receiver Event Loop Thread

eventLoopThread is a daemon https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html[ScheduledThreadPoolExecutor] with a single thread.

The name of the thread pool is heartbeat-receiver-event-loop-thread.

=== [[expireDeadHosts]] expireDeadHosts Internal Method

[source, scala]

expireDeadHosts(): Unit

CAUTION: FIXME

NOTE: expireDeadHosts is used when HeartbeatReceiver <ExpireDeadHosts message>>.

=== [[settings]] Settings

.Spark Properties [cols="1,1,2",options="header",width="100%"] |=== | Spark Property | Default Value | Description

| [[spark.storage.blockManagerTimeoutIntervalMs]] spark.storage.blockManagerTimeoutIntervalMs | 60s |

| [[spark_storage_blockManagerSlaveTimeoutMs]] spark.storage.blockManagerSlaveTimeoutMs | 120s |

| [[spark.network.timeout]] spark.network.timeout | <> | See rpc:index.md#spark.network.timeout[spark.network.timeout] in rpc:index.md[RPC Environment (RpcEnv)]

| [[spark.network.timeoutInterval]] spark.network.timeoutInterval | <> | |===


Last update: 2020-10-08