RPC Environment

FIXME

RPC Environment (aka RpcEnv) is an environment for RpcEndpoints to process messages. A RPC Environment manages the entire lifecycle of RpcEndpoints:

  • registers (sets up) endpoints (by name or uri)

  • routes incoming messages to them

  • stops them

A RPC Environment is defined by the name, host, and port. It can also be controlled by a security manager.

You can create a RPC Environment using RpcEnv.create factory methods.

The only implementation of RPC Environment is Netty-based implementation.

A RpcEndpoint defines how to handle messages (what functions to execute given a message). RpcEndpoints register (with a name or uri) to RpcEnv to receive messages from RpcEndpointRefs.

rpcenv endpoints
Figure 1. RpcEnvironment with RpcEndpoints and RpcEndpointRefs

RpcEndpointRefs can be looked up by name or uri (because different RpcEnvs may have different naming schemes).

org.apache.spark.rpc package contains the machinery for RPC communication in Spark.

Client Mode = is this an executor or the driver?

When an RPC Environment is initialized as part of the initialization of the driver or executors (using RpcEnv.create), clientMode is false for the driver and true for executors.

RpcEnv.create(actorSystemName, hostname, port, conf, securityManager, clientMode = !isDriver)

Refer to Client Mode in Netty-based RpcEnv for the implementation-specific details.

Creating RpcEndpointRef For URI — asyncSetupEndpointRefByURI Method

FIXME

Creating RpcEndpointRef For URI — setupEndpointRefByURI Method

FIXME

shutdown Method

FIXME

Registering RPC Endpoint — setupEndpoint Method

FIXME

awaitTermination Method

FIXME

ThreadSafeRpcEndpoint

ThreadSafeRpcEndpoint is a marker RpcEndpoint that does nothing by itself but tells…​

FIXME What is marker?
ThreadSafeRpcEndpoint is a private[spark] trait.

RpcAddress

RpcAddress is the logical address for an RPC Environment, with hostname and port.

RpcAddress is encoded as a Spark URL, i.e. spark://host:port.

RpcEndpointAddress

RpcEndpointAddress is the logical address for an endpoint registered to an RPC Environment, with RpcAddress and name.

It is in the format of spark://[name]@[rpcAddress.host]:[rpcAddress.port].

Stopping RpcEndpointRef — stop Method

stop(endpoint: RpcEndpointRef): Unit
FIXME

Endpoint Lookup Timeout

When a remote endpoint is resolved, a local RPC environment connects to the remote one. It is called endpoint lookup. To configure the time needed for the endpoint lookup you can use the following settings.

It is a prioritized list of lookup timeout properties (the higher on the list, the more important):

Their value can be a number alone (seconds) or any number with time suffix, e.g. 50s, 100ms, or 250us. See Settings.

Ask Operation Timeout

Ask operation is when a RPC client expects a response to a message. It is a blocking operation.

You can control the time to wait for a response using the following settings (in that order):

Their value can be a number alone (seconds) or any number with time suffix, e.g. 50s, 100ms, or 250us. See Settings.

Exceptions

When RpcEnv catches uncaught exceptions, it uses RpcCallContext.sendFailure to send exceptions back to the sender, or logging them if no such sender or NotSerializableException.

If any error is thrown from one of RpcEndpoint methods except onError, onError will be invoked with the cause. If onError throws an error, RpcEnv will ignore it.

RpcEnvConfig

RpcEnvConfig is a placeholder for an instance of SparkConf, the name of the RPC Environment, host and port, a security manager, and clientMode.

Creating RpcEnv — create Factory Methods

create(
  name: String,
  host: String,
  port: Int,
  conf: SparkConf,
  securityManager: SecurityManager,
  clientMode: Boolean = false): RpcEnv  (1)

create(
  name: String,
  bindAddress: String,
  advertiseAddress: String,
  port: Int,
  conf: SparkConf,
  securityManager: SecurityManager,
  clientMode: Boolean): RpcEnv
1 The 6-argument create (with clientMode disabled) simply passes the input arguments on to the second create making bindAddress and advertiseAddress the same.

create creates a RpcEnvConfig (with the input arguments) and creates a NettyRpcEnv.

"Client mode" means the RPC env will not listen for incoming connections.

This allows certain processes in the Spark stack (such as Executors or tha YARN client-mode AM) to act as pure clients when using the netty-based RPC backend, reducing the number of sockets Spark apps need to use and also the number of open ports.

The AM connects to the driver in "client mode", and that connection is used for all driver — AM communication, and so the AM is properly notified when the connection goes down.

In "general", non-YARN case, clientMode flag is therefore enabled for executors and disabled for the driver.

In Spark on YARN in client deploy mode, clientMode flag is however enabled explicitly when Spark on YARN’s ApplicationMaster creates the sparkYarnAM RPC Environment.

create is used when:

  1. SparkEnv creates a RpcEnv (for the driver and executors).

  2. Spark on YARN’s ApplicationMaster creates the sparkYarnAM RPC Environment (with clientMode enabled).

  3. CoarseGrainedExecutorBackend creates the temporary driverPropsFetcher RPC Environment (to fetch the current Spark properties from the driver).

  4. org.apache.spark.deploy.Client standalone application creates the driverClient RPC Environment.

  5. Spark Standalone’s master creates the sparkMaster RPC Environment.

  6. Spark Standalone’s worker creates the sparkWorker RPC Environment.

  7. Spark Standalone’s DriverWrapper creates the Driver RPC Environment.

Settings

Table 1. Spark Properties
Spark Property Default Value Description

spark.rpc.lookupTimeout

120s

Timeout to use for RPC remote endpoint lookup. Refer to Endpoint Lookup Timeout

spark.rpc.numRetries

3

Number of attempts to send a message to and receive a response from a remote endpoint.

spark.rpc.retry.wait

3s

Time to wait between retries.

spark.rpc.askTimeout

120s

Timeout for RPC ask calls. Refer to Ask Operation Timeout.

spark.network.timeout

120s

Network timeout to use for RPC remote endpoint lookup. Fallback for spark.rpc.askTimeout.