RpcEnv

RpcEnv is an abstraction of RPC systems.

Available RpcEnvs

NettyRpcEnv is the default and only known RpcEnv in Apache Spark.

Contract

address Method

address: RpcAddress

RpcAddress of the RPC system

asyncSetupEndpointRefByURI Method

asyncSetupEndpointRefByURI(
  uri: String): Future[RpcEndpointRef]

Sets up an RPC endpoing by URI (asynchronously) and returns RpcEndpointRef — Reference to RPC Endpoint

Used when:

awaitTermination Method

awaitTermination(): Unit

Waits till the RPC system terminates

Used when:

  • SparkEnv is requested to stop

  • ClientApp is requested to start

  • LocalSparkCluster is requested to stop

  • (Spark Standalone) Master and Worker are launched

  • CoarseGrainedExecutorBackend standalone application is launched

deserialize Method

deserialize[T](
  deserializationAction: () => T): T

Used when:

  • PersistenceEngine is requested to readPersistedData

  • NettyRpcEnv is requested to deserialize

endpointRef Method

endpointRef(
  endpoint: RpcEndpoint): RpcEndpointRef

Used when RpcEndpoint is requested for the RpcEndpointRef to itself

RpcEnvFileServer

fileServer: RpcEnvFileServer

RpcEnvFileServer of the RPC system

Used when SparkContext is created (and registers the REPL’s output directory) and requested to addFile or addJar

openChannel Method

openChannel(
  uri: String): ReadableByteChannel

Opens a channel to download a file from the given URI

Used when:

  • Utils utility is used to doFetchFile

  • ExecutorClassLoader is requested to getClassFileInputStreamFromSparkRPC

setupEndpoint Method

setupEndpoint(
  name: String,
  endpoint: RpcEndpoint): RpcEndpointRef

Used when:

  • SparkContext is created (and registers the HeartbeatReceiver)

  • SparkEnv utility is used to create a SparkEnv (and register the BlockManagerMaster, MapOutputTracker and OutputCommitCoordinator RPC endpoints on the driver)

  • ClientApp is requested to start (and register the client RPC endpoint)

  • StandaloneAppClient is requested to start (and register the AppClient RPC endpoint)

  • (Spark Standalone) Master is requested to startRpcEnvAndEndpoint (and register the Master RPC endpoint)

  • (Spark Standalone) Worker is requested to startRpcEnvAndEndpoint (and register the Worker RPC endpoint)

  • DriverWrapper standalone application is launched (and registers the workerWatcher RPC endpoint)

  • CoarseGrainedExecutorBackend standalone application is launched (and registers the Executor and WorkerWatcher RPC endpoints)

  • TaskSchedulerImpl is requested to maybeInitBarrierCoordinator

  • CoarseGrainedSchedulerBackend is requested to createDriverEndpointRef (and registers the CoarseGrainedScheduler RPC endpoint)

  • LocalSchedulerBackend is requested to start (and registers the LocalSchedulerBackendEndpoint RPC endpoint)

  • BlockManager is created (and registers the BlockManagerEndpoint RPC endpoint)

  • (Spark on YARN) ApplicationMaster is requested to createAllocator (and registers the YarnAM RPC endpoint)

  • (Spark on YARN) YarnSchedulerBackend is created (and registers the YarnScheduler RPC endpoint)

setupEndpointRef Method

setupEndpointRef(
  address: RpcAddress,
  endpointName: String): RpcEndpointRef

setupEndpointRef creates an RpcEndpointAddress (for the given RpcAddress and endpoint name) and setupEndpointRefByURI.

setupEndpointRef is used when:

  • ClientApp is requested to start

  • ClientEndpoint is requested to tryRegisterAllMasters

  • Worker is requested to tryRegisterAllMasters and reregisterWithMaster

  • RpcUtils utility is used to makeDriverRef

  • (Spark on YARN) ApplicationMaster is requested to runDriver and runExecutorLauncher

setupEndpointRefByURI Method

setupEndpointRefByURI(
  uri: String): RpcEndpointRef

setupEndpointRefByURI asyncSetupEndpointRefByURI by the given URI and waits for the result or defaultLookupTimeout.

setupEndpointRefByURI is used when:

shutdown Method

shutdown(): Unit

Shuts down the RPC system

Used when:

  • SparkEnv is requested to stop

  • LocalSparkCluster is requested to stop

  • DriverWrapper is launched

  • CoarseGrainedExecutorBackend is launched

  • NettyRpcEnvFactory is requested to create an RpcEnv (in server mode and failed to assign a port)

stop Method

stop(
  endpoint: RpcEndpointRef): Unit

Used when:

  • SparkContext is requested to stop

  • RpcEndpoint is requested to stop

  • BlockManager is requested to stop

Default Endpoint Lookup Timeout

RpcEnv uses the default lookup timeout for…​FIXME

When a remote endpoint is resolved, a local RPC environment connects to the remote one. It is called endpoint lookup. To configure the time needed for the endpoint lookup you can use the following settings.

It is a prioritized list of lookup timeout properties (the higher on the list, the more important):

Their value can be a number alone (seconds) or any number with time suffix, e.g. 50s, 100ms, or 250us. See Settings.

Creating Instance

RpcEnv takes the following to be created:

RpcEnv is created using RpcEnv.create utility.

RpcEnv is an abstract class and cannot be created directly. It is created indirectly for the concrete RpcEnvs.

Creating RpcEnv

create(
  name: String,
  host: String,
  port: Int,
  conf: SparkConf,
  securityManager: SecurityManager,
  clientMode: Boolean = false): RpcEnv (1)
create(
  name: String,
  bindAddress: String,
  advertiseAddress: String,
  port: Int,
  conf: SparkConf,
  securityManager: SecurityManager,
  numUsableCores: Int,
  clientMode: Boolean): RpcEnv
1 Uses 0 for numUsableCores

create creates a NettyRpcEnvFactory and requests to create an RpcEnv (with an RpcEnvConfig with all the given arguments).

create is used when:

  • SparkEnv utility is requested to create a SparkEnv (clientMode flag is turned on for executors and off for the driver)

  • With clientMode flag turned on:

    • (Spark on YARN) ApplicationMaster is requested to runExecutorLauncher (in client deploy mode with clientMode flag is turned on)

    • ClientApp is requested to start

    • (Spark Standalone) Master is requested to startRpcEnvAndEndpoint

    • DriverWrapper standalone application is launched

    • (Spark Standalone) Worker is requested to startRpcEnvAndEndpoint

    • CoarseGrainedExecutorBackend is requested to run