|Read up RpcEnv — RPC Environment on the concept of RPC Environment in Spark.|
The class org.apache.spark.rpc.netty.NettyRpcEnv is the implementation of RpcEnv using Netty - "an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients".
Netty-based RPC Environment is created by
NettyRpcEnvFactory when spark.rpc is
It uses Java’s built-in serialization (the implementation of
|FIXME What other choices of JavaSerializerInstance are available in Spark?|
The default port to listen to is
When NettyRpcEnv starts, the following INFO message is printed out in the logs:
INFO Utils: Successfully started service 'NettyRpcEnv' on port 0.
FIXME: The message above in TransportServer has a space before
Refer to Client Mode = is this an executor or the driver? for introduction about client mode. This is only for Netty-based RpcEnv.
When created, a Netty-based RpcEnv starts the RPC server and register necessary endpoints for non-client mode, i.e. when client mode is
|FIXME What endpoints?|
It means that the required services for remote communication with NettyRpcEnv are only started on the driver (not executors).
EventLoopGroup uses a daemon thread pool called
ID is a unique integer for
EPOLL) for the Shuffle server.
FIXME Review Netty’s
FIXME Where are
NettyRpcEnv’s Dispatcher uses the daemon fixed thread pool with spark.rpc.netty.dispatcher.numThreads threads.
Thread names are formatted as
ID is a unique, sequentially assigned integer.
It starts the message processing loop on all of the threads.
NettyRpcEnv uses the daemon single-thread scheduled thread pool
"netty-rpc-env-timeout" #87 daemon prio=5 os_prio=31 tid=0x00007f887775a000 nid=0xc503 waiting on condition [0x0000000123397000]
The Netty-based implementation uses the following properties:
EPOLLfor low-level IO.
NIOis always available, while
EPOLLis only available on Linux.
8) - the number of threads to use for the Netty client and server thread pools.
spark.shuffle.io.serverThreads(default: the value of
spark.shuffle.io.clientThreads(default: the value of
spark.rpc.netty.dispatcher.numThreads(default: the number of processors available to JVM)
64) - used in cluster mode to communicate with a remote RPC endpoint
100for testing when
spark.testingis set) controls the maximum number of binding attempts/retries to a port before giving up.
RpcEndpointVerifier) - a RpcEndpoint for remote RpcEnvs to query whether an RpcEndpoint exists or not. It uses
Dispatcherthat keeps track of registered endpoints and responds
endpoint-verifier is used to check out whether a given endpoint exists or not before the endpoint’s reference is given back to clients.
One use case is when an AppClient connects to standalone Masters before it registers the application it acts for.
FIXME Who’d like to use
A message dispatcher is responsible for routing RPC messages to the appropriate endpoint(s).
It uses the daemon fixed thread pool
spark.rpc.netty.dispatcher.numThreads threads for dispatching messages.
"dispatcher-event-loop-0" #26 daemon prio=5 os_prio=31 tid=0x00007f8877153800 nid=0x7103 waiting on condition [0x000000011f78b000]