Skip to content

NettyBlockRpcServer

NettyBlockRpcServer is a RpcHandler to handle messages for NettyBlockTransferService.

NettyBlockRpcServer and NettyBlockTransferService

Creating Instance

NettyBlockRpcServer takes the following to be created:

NettyBlockRpcServer is created when:

  • NettyBlockTransferService is requested to initialize

OneForOneStreamManager

NettyBlockRpcServer uses a OneForOneStreamManager.

Receiving RPC Messages

receive(
  client: TransportClient,
  rpcMessage: ByteBuffer,
  responseContext: RpcResponseCallback): Unit

receive deserializes the incoming RPC message (from ByteBuffer to BlockTransferMessage) and prints out the following TRACE message to the logs:

Received request: [message]

receive handles the message.

receive is part of the RpcHandler abstraction.

FetchShuffleBlocks

FetchShuffleBlocks carries the following:

  • Application ID
  • Executor ID
  • Shuffle ID
  • Map IDs (long[])
  • Reduce IDs (long[][])
  • batchFetchEnabled flag

When received, receive...FIXME

receive prints out the following TRACE message in the logs:

Registered streamId [streamId] with [numBlockIds] buffers

In the end, receive responds with a StreamHandle (with the streamId and the number of blocks). The response is serialized to a ByteBuffer.

FetchShuffleBlocks is posted when:

GetLocalDirsForExecutors

OpenBlocks

OpenBlocks carries the following:

  • Application ID
  • Executor ID
  • Block IDs

When received, receive...FIXME

receive prints out the following TRACE message in the logs:

Registered streamId [streamId] with [blocksNum] buffers

In the end, receive responds with a StreamHandle (with the streamId and the number of blocks). The response is serialized to a ByteBuffer.

OpenBlocks is posted when:

  • OneForOneBlockFetcher is requested to start

UploadBlock

UploadBlock carries the following:

  • Application ID
  • Executor ID
  • Block ID
  • Metadata (byte[])
  • Block Data (byte[])

When received, receive deserializes the metadata to get the StorageLevel and ClassTag of the block being uploaded.

receive...FIXME

UploadBlock is posted when:

Logging

Enable ALL logging level for org.apache.spark.network.netty.NettyBlockRpcServer logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.network.netty.NettyBlockRpcServer=ALL

Refer to Logging.