HeartbeatReceiver RPC Endpoint¶
HeartbeatReceiver is a rpc:RpcEndpoint.md#ThreadSafeRpcEndpoint[ThreadSafeRpcEndpoint] registered on the driver under the name HeartbeatReceiver.
HeartbeatReceiver receives <
HeartbeatReceiver is registered immediately after a Spark application is started, i.e. when SparkContext is created.
HeartbeatReceiver is a ROOT:SparkListener.md to get notified when <
HeartbeatReceiver tracks executors (in <
[[messages]] .HeartbeatReceiver RPC Endpoint's Messages (in alphabetical order) [width="100%",cols="1,2",options="header"] |=== | Message | Description
Executor executor:Executor.md#reportHeartBeat[informs that it is alive and reports task metrics].
SparkContext informs that
TaskScheduler is available. |===
[[internal-registries]] .HeartbeatReceiver's Internal Registries and Counters [cols="1,2",options="header",width="100%"] |=== | Name | Description
executorLastSeen | Executor ids and the timestamps of when the last heartbeat was received.
scheduler | scheduler:TaskScheduler.md[TaskScheduler] |===
TRACE logging levels for
org.apache.spark.HeartbeatReceiver to see what happens inside.
Add the following line to
Refer to spark-logging.md[Logging].¶
=== [[creating-instance]] Creating HeartbeatReceiver Instance
HeartbeatReceiver takes the following when created:
- [[sc]] ROOT:SparkContext.md
HeartbeatReceiver ROOT:SparkContext.md#addSparkListener[registers itself as a
HeartbeatReceiver initializes the <
=== [[onStart]] Starting HeartbeatReceiver RPC Endpoint --
onStart is part of the rpc:RpcEndpoint.md[RpcEndpoint Contract]
HeartbeatReceiver sends a blocking <
=== [[ExecutorRegistered]] ExecutorRegistered
HeartbeatReceiver registers the
executorId executor and the current time (in <
HeartbeatReceiver uses the internal <
=== [[ExecutorRemoved]] ExecutorRemoved
executorId from <
=== [[ExpireDeadHosts]] ExpireDeadHosts
ExpireDeadHosts arrives the following TRACE is printed out to the logs:
TRACE HeartbeatReceiver: Checking for hosts with no recent heartbeats in HeartbeatReceiver.
Each executor (in <
For any such executor, the following WARN message is printed out to the logs:
WARN HeartbeatReceiver: Removing executor [executorId] with no recent heartbeats: [time] ms exceeds timeout [timeout] ms
scheduler:TaskScheduler.md#executorLost[TaskScheduler.executorLost] is called (with
SlaveLost("Executor heartbeat timed out after [timeout] ms").
SparkContext.killAndReplaceExecutor is asynchronously called for the executor (i.e. on <
The executor is removed from <
=== [[Heartbeat]] Heartbeat
Heartbeat( executorId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId)
HeartbeatReceiver finds the
executorId executor (in <
When the executor is found,
HeartbeatReceiver updates the time the heartbeat was received (in <
HeartbeatReceiver uses the internal <
HeartbeatReceiver then submits an asynchronous task to notify
TaskScheduler that the scheduler:TaskScheduler.md#executorHeartbeatReceived[heartbeat was received from the executor] (using <
HeartbeatReceiver posts a
HeartbeatResponse back to the executor (with the response from
TaskScheduler whether the executor has been registered already or not so it may eventually need to re-register).
If however the executor was not found (in <
DEBUG Received heartbeat from unknown executor [executorId]
In a very rare case, when <
HeartbeatReceiver, you should see the following WARN message in the logs and the response is to notify the executor to re-register.
WARN Dropping [heartbeat] because TaskScheduler is not ready yet
Heartbeats messages are the mechanism of executor:Executor.md#heartbeats-and-active-task-metrics[executors to inform the Spark application that they are alive and update about the state of active tasks].
=== [[TaskSchedulerIsSet]] TaskSchedulerIsSet
HeartbeatReceiver sets the internal reference to <
HeartbeatReceiver uses <
onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit¶
onExecutorAdded simply <
onExecutorAdded is part of ROOT:SparkListener.md#onExecutorAdded[SparkListener contract] to announce that a new executor was registered with a Spark application.
=== [[addExecutor]] Sending ExecutorRegistered Message to Itself --
addExecutor Internal Method
addExecutor(executorId: String): Option[Future[Boolean]]¶
addExecutor sends a <
addExecutor is used when
onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit¶
onExecutorRemoved simply passes the call to <
onExecutorRemoved is part of ROOT:SparkListener.md#onExecutorRemoved[SparkListener contract] to announce that an executor is no longer available for a Spark application.
=== [[removeExecutor]] Sending ExecutorRemoved Message to Itself --
removeExecutor(executorId: String): Option[Future[Boolean]]¶
removeExecutor sends a <
removeExecutor is used when
=== [[onStop]] Stopping HeartbeatReceiver RPC Endpoint --
onStop is part of the rpc:index.md#RpcEndpoint[RpcEndpoint Contract]
HeartbeatReceiver cancels the checking task (that sends a blocking <
killExecutorThread -- Kill Executor Thread
killExecutorThread is a daemon https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html[ScheduledThreadPoolExecutor] with a single thread.
The name of the thread pool is kill-executor-thread.
NOTE: It is used to request SparkContext to kill the executor.
eventLoopThread -- Heartbeat Receiver Event Loop Thread
eventLoopThread is a daemon https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html[ScheduledThreadPoolExecutor] with a single thread.
The name of the thread pool is heartbeat-receiver-event-loop-thread.
expireDeadHosts Internal Method
expireDeadHosts is used when
=== [[settings]] Settings
.Spark Properties [cols="1,1,2",options="header",width="100%"] |=== | Spark Property | Default Value | Description
spark.network.timeout | <
spark.network.timeoutInterval | <