NettyBlockRpcServer

NettyBlockRpcServer is an RpcHandler to handle messages for NettyBlockTransferService.

NettyBlockRpcServer
Figure 1. NettyBlockRpcServer and NettyBlockTransferService

Creating Instance

NettyBlockRpcServer takes the following to be created:

NettyBlockRpcServer is created when NettyBlockTransferService is requested to initialize.

OneForOneStreamManager

NettyBlockRpcServer uses a OneForOneStreamManager for…​FIXME

Receiving RPC Messages

receive(
  client: TransportClient,
  rpcMessage: ByteBuffer,
  responseContext: RpcResponseCallback): Unit

receive…​FIXME

receive is part of RpcHandler abstraction.

NettyBlockRpcServer Messages

OpenBlocks Message Handler

When OpenBlocks arrives, NettyBlockRpcServer requests the BlockDataManager for block data for every block id in the message. The block data is a collection of ManagedBuffer for every block id in the incoming message.

The internal StreamManager is OneForOneStreamManager and is created when NettyBlockRpcServer is created.

You should see the following TRACE message in the logs:

NettyBlockRpcServer: Registered streamId [streamId]  with [size] buffers

In the end, NettyBlockRpcServer responds with a StreamHandle (with the streamId and the number of blocks). The response is serialized as a ByteBuffer.

UploadBlock Message Handler

When UploadBlock arrives, NettyBlockRpcServer deserializes the metadata of the input message to get the StorageLevel and ClassTag of the block being uploaded.

NettyBlockRpcServer creates a BlockId for the block id and requests the BlockDataManager to store the block.

In the end, NettyBlockRpcServer responds with a 0-capacity ByteBuffer.

UploadBlock is sent when NettyBlockTransferService is requested to upload a block.

Receiving RPC Message with Streamed Data

receiveStream(
  client: TransportClient,
  messageHeader: ByteBuffer,
  responseContext: RpcResponseCallback): StreamCallbackWithID

receiveStream…​FIXME

receiveStream is part of RpcHandler abstraction.

Logging

Enable ALL logging level for org.apache.spark.network.netty.NettyBlockRpcServer logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.network.netty.NettyBlockRpcServer=ALL

Refer to Logging.