Skip to content

LocalEndpoint

LocalEndpoint is the ThreadSafeRpcEndpoint for LocalSchedulerBackend and is registered under the LocalSchedulerBackendEndpoint name.

Review Me

LocalEndpoint is <> exclusively when LocalSchedulerBackend is requested to <>.

Put simply, LocalEndpoint is the communication channel between <> and <>. LocalEndpoint is a (thread-safe) rpc:RpcEndpoint.md[RpcEndpoint] that hosts an <> (with driver ID and localhost hostname) for Spark local mode.

[[messages]] .LocalEndpoint's RPC Messages [cols="1,3",options="header",width="100%"] |=== | Message | Description

| <> | Requests the <> to executor:Executor.md#killTask[kill a given task]

| <> | Calls <>

<>

| <> | Requests the <> to executor:Executor.md#stop[stop]

|===

When a LocalEndpoint starts up (as part of Spark local's initialization) it prints out the following INFO messages to the logs:

INFO Executor: Starting executor ID driver on host localhost
INFO Executor: Using REPL class URI: http://192.168.1.4:56131

[[executor]] LocalEndpoint creates a single executor:Executor.md[] with the following properties:

  • [[localExecutorId]] driver ID for the executor:Executor.md#executorId[executor ID]

  • [[localExecutorHostname]] localhost for the executor:Executor.md#executorHostname[hostname]

  • <> for the executor:Executor.md#userClassPath[user-defined CLASSPATH]

  • executor:Executor.md#isLocal[isLocal] flag enabled

The <> is then used when LocalEndpoint is requested to handle <> and <> RPC messages, and <>.

[[internal-registries]] .LocalEndpoint's Internal Properties (e.g. Registries, Counters and Flags) [cols="1m,3",options="header",width="100%"] |=== | Name | Description

| freeCores a| [[freeCores]] The number of CPU cores that are free to use (to schedule tasks)

Default: Initial <> (aka totalCores)

Increments when LocalEndpoint is requested to handle <> RPC message with a finished state

Decrements when LocalEndpoint is requested to <> and there were tasks to execute

NOTE: A single task to execute costs scheduler:TaskSchedulerImpl.md#CPUS_PER_TASK[spark.task.cpus] configuration (default: 1).

Used when LocalEndpoint is requested to <>

|===

[[logging]] [TIP] ==== Enable INFO logging level for org.apache.spark.scheduler.local.LocalEndpoint logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.local.LocalEndpoint=INFO

Refer to <<../spark-logging.md#, Logging>>.

=== [[creating-instance]] Creating LocalEndpoint Instance

LocalEndpoint takes the following to be created:

  • [[rpcEnv]] <<../index.md#, RpcEnv>>
  • [[userClassPath]] User-defined class path (Seq[URL]) that is the <> configuration property and is used exclusively to create the <>
  • [[scheduler]] scheduler:TaskSchedulerImpl.md[TaskSchedulerImpl]
  • [[executorBackend]] <>
  • [[totalCores]] Number of CPU cores (aka totalCores)

LocalEndpoint initializes the <>.

=== [[receive]] Processing Receive-Only RPC Messages -- receive Method

[source, scala]

receive: PartialFunction[Any, Unit]

NOTE: receive is part of the rpc:RpcEndpoint.md#receive[RpcEndpoint] abstraction.

receive handles (processes) <>, <>, and <> RPC messages.

==== [[ReviveOffers]] ReviveOffers RPC Message

[source, scala]

ReviveOffers()

When <>, LocalEndpoint <>.

NOTE: ReviveOffers RPC message is sent out exclusively when LocalSchedulerBackend is requested to <>.

==== [[StatusUpdate]] StatusUpdate RPC Message

[source, scala]

StatusUpdate( taskId: Long, state: TaskState, serializedData: ByteBuffer)


When <>, LocalEndpoint requests the <> to scheduler:TaskSchedulerImpl.md#statusUpdate[handle a task status update] (given the taskId, the task state and the data).

If the given scheduler:Task.md#TaskState[TaskState] is a finished state (one of FINISHED, FAILED, KILLED, LOST states), LocalEndpoint adds scheduler:TaskSchedulerImpl.md#CPUS_PER_TASK[spark.task.cpus] configuration (default: 1) to the <> registry followed by <>.

NOTE: StatusUpdate RPC message is sent out exclusively when LocalSchedulerBackend is requested to <>.

==== [[KillTask]] KillTask RPC Message

[source, scala]

KillTask( taskId: Long, interruptThread: Boolean, reason: String)


When <>, LocalEndpoint requests the single <> to executor:Executor.md#killTask[kill a task] (given the taskId, the interruptThread flag and the reason).

NOTE: KillTask RPC message is sent out exclusively when LocalSchedulerBackend is requested to <>.

=== [[reviveOffers]] Reviving Offers -- reviveOffers Method

[source, scala]

reviveOffers(): Unit

reviveOffers...FIXME

NOTE: reviveOffers is used when LocalEndpoint is requested to <> (namely <> and <>).

=== [[receiveAndReply]] Processing Receive-Reply RPC Messages -- receiveAndReply Method

[source, scala]

receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]

NOTE: receiveAndReply is part of the rpc:RpcEndpoint.md#receiveAndReply[RpcEndpoint] abstraction.

receiveAndReply handles (processes) <> RPC message exclusively.

==== [[StopExecutor]] StopExecutor RPC Message

[source, scala]

StopExecutor()

When <>, LocalEndpoint requests the single <> to executor:Executor.md#stop[stop] and requests the given RpcCallContext to reply with true (as the response).

NOTE: StopExecutor RPC message is sent out exclusively when LocalSchedulerBackend is requested to <>.