Skip to content

RpcEnv

RpcEnv is an abstraction of RPC environments.

Contract

address

address: RpcAddress

RpcAddress of this RPC environments

asyncSetupEndpointRefByURI

asyncSetupEndpointRefByURI(
  uri: String): Future[RpcEndpointRef]

Looking up a RpcEndpointRef of the RPC endpoint by URI (asynchronously)

Used when:

awaitTermination

awaitTermination(): Unit

Blocks the current thread till the RPC environment terminates

Used when:

  • SparkEnv is requested to stop
  • ClientApp (Spark Standalone) is requested to start
  • LocalSparkCluster (Spark Standalone) is requested to stop
  • Master (Spark Standalone) and Worker (Spark Standalone) are launched
  • CoarseGrainedExecutorBackend is requested to run

deserialize

deserialize[T](
  deserializationAction: () => T): T

Used when:

  • PersistenceEngine is requested to readPersistedData
  • NettyRpcEnv is requested to deserialize

endpointRef

endpointRef(
  endpoint: RpcEndpoint): RpcEndpointRef

Used when:

RpcEnvFileServer

fileServer: RpcEnvFileServer

RpcEnvFileServer of this RPC environment

Used when:

  • SparkContext is requested to addFile, addJar and is created (and registers the REPL's output directory)

openChannel

openChannel(
  uri: String): ReadableByteChannel

Opens a channel to download a file at the given URI

Used when:

  • Utils utility is used to doFetchFile
  • ExecutorClassLoader is requested to getClassFileInputStreamFromSparkRPC

setupEndpoint

setupEndpoint(
  name: String,
  endpoint: RpcEndpoint): RpcEndpointRef

shutdown

shutdown(): Unit

Shuts down this RPC environment asynchronously (and to make sure this RpcEnv exits successfully, use awaitTermination)

Used when:

  • SparkEnv is requested to stop
  • LocalSparkCluster (Spark Standalone) is requested to stop
  • DriverWrapper is launched
  • CoarseGrainedExecutorBackend is launched
  • NettyRpcEnvFactory is requested to create an RpcEnv (in server mode and failed to assign a port)

Stopping RpcEndpointRef

stop(
  endpoint: RpcEndpointRef): Unit

Used when:

  • SparkContext is requested to stop
  • RpcEndpoint is requested to stop
  • BlockManager is requested to stop
  • in Spark SQL

Implementations

Creating Instance

RpcEnv takes the following to be created:

RpcEnv is created using RpcEnv.create utility.

Abstract Class

RpcEnv is an abstract class and cannot be created directly. It is created indirectly for the concrete RpcEnvs.

Creating RpcEnv

create(
  name: String,
  host: String,
  port: Int,
  conf: SparkConf,
  securityManager: SecurityManager,
  clientMode: Boolean = false): RpcEnv // (1)
create(
  name: String,
  bindAddress: String,
  advertiseAddress: String,
  port: Int,
  conf: SparkConf,
  securityManager: SecurityManager,
  numUsableCores: Int,
  clientMode: Boolean): RpcEnv
  1. Uses 0 for numUsableCores

create creates a NettyRpcEnvFactory and requests it to create an RpcEnv (with a new RpcEnvConfig with all the given arguments).

create is used when:

  • SparkEnv utility is requested to create a SparkEnv (clientMode flag is turned on for executors and off for the driver)
  • With clientMode flag true:

    • CoarseGrainedExecutorBackend is requested to run
    • ClientApp (Spark Standalone) is requested to start
    • Master (Spark Standalone) is requested to startRpcEnvAndEndpoint
    • Worker (Spark Standalone) is requested to startRpcEnvAndEndpoint
    • DriverWrapper is launched
    • ApplicationMaster (Spark on YARN) is requested to runExecutorLauncher (in client deploy mode)

Default Endpoint Lookup Timeout

RpcEnv uses the default lookup timeout for...FIXME

When a remote endpoint is resolved, a local RPC environment connects to the remote one (endpoint lookup). To configure the time needed for the endpoint lookup you can use the following settings.

It is a prioritized list of lookup timeout properties (the higher on the list, the more important):