Skip to content

NettyBlockTransferService

NettyBlockTransferService is a BlockTransferService that uses Netty for uploading and fetching blocks of data.

NettyBlockTransferService, SparkEnv and BlockManager

Creating Instance

NettyBlockTransferService takes the following to be created:

NettyBlockTransferService is created when:

Initializing

init(
  blockDataManager: BlockDataManager): Unit

init is part of the BlockTransferService abstraction.

init creates a NettyBlockRpcServer (with the application ID, a JavaSerializer and the given BlockDataManager).

init creates a TransportContext (with the NettyBlockRpcServer just created) and requests it for a TransportClientFactory.

init createServer.

In the end, init prints out the following INFO message to the logs:

Server created on [hostName]:[port]

Fetching Blocks

fetchBlocks(
  host: String,
  port: Int,
  execId: String,
  blockIds: Array[String],
  listener: BlockFetchingListener,
  tempFileManager: DownloadFileManager): Unit

fetchBlocks prints out the following TRACE message to the logs:

Fetch blocks from [host]:[port] (executor id [execId])

fetchBlocks requests the TransportConf for the maxIORetries.

fetchBlocks creates a BlockTransferStarter.

With the maxIORetries above zero, fetchBlocks creates a RetryingBlockFetcher (with the BlockFetchStarter, the blockIds and the BlockFetchingListener) and starts it.

Otherwise, fetchBlocks requests the BlockFetchStarter to createAndStart (with the blockIds and the BlockFetchingListener).

In case of any Exception, fetchBlocks prints out the following ERROR message to the logs and the given BlockFetchingListener gets notified.

Exception while beginning fetchBlocks

fetchBlocks is part of the BlockStoreClient abstraction.

BlockTransferStarter

fetchBlocks creates a BlockTransferStarter. When requested to createAndStart, the BlockTransferStarter requests the TransportClientFactory to create a TransportClient.

createAndStart creates an OneForOneBlockFetcher and requests it to start.

IOException

In case of an IOException, createAndStart requests the driver RpcEndpointRef to send an IsExecutorAlive message synchronously (with the given execId).

If the driver RpcEndpointRef replied false, createAndStart throws an ExecutorDeadException:

The relative remote executor(Id: [execId]),
which maintains the block data to fetch is dead.

Otherwise, createAndStart (re)throws the IOException.

Uploading Block

uploadBlock(
  hostname: String,
  port: Int,
  execId: String,
  blockId: BlockId,
  blockData: ManagedBuffer,
  level: StorageLevel,
  classTag: ClassTag[_]): Future[Unit]

uploadBlock is part of the BlockTransferService abstraction.

uploadBlock creates a TransportClient (with the given hostname and port).

uploadBlock serializes the given StorageLevel and ClassTag (using a JavaSerializer).

uploadBlock uses a stream to transfer shuffle blocks when one of the following holds:

  1. The size of the block data (ManagedBuffer) is above spark.network.maxRemoteBlockSizeFetchToMem configuration property
  2. The given BlockId is a shuffle block

For stream transfer uploadBlock requests the TransportClient to uploadStream. Otherwise, uploadBlock requests the TransportClient to sendRpc a UploadBlock message.

Note

UploadBlock message is processed by NettyBlockRpcServer.

With the upload successful, uploadBlock prints out the following TRACE message to the logs:

Successfully uploaded block [blockId] [as stream]

With the upload failed, uploadBlock prints out the following ERROR message to the logs:

Error while uploading block [blockId] [as stream]

Logging

Enable ALL logging level for org.apache.spark.network.netty.NettyBlockTransferService logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.network.netty.NettyBlockTransferService=ALL

Refer to Logging.