NettyBlockTransferService¶
NettyBlockTransferService is a BlockTransferService that uses Netty for uploading and fetching blocks of data.

Creating Instance¶
NettyBlockTransferService takes the following to be created:
- SparkConf
-
SecurityManager - Bind Address
- Host Name
- Port
- Number of CPU Cores
- Driver RpcEndpointRef
NettyBlockTransferService is created when:
SparkEnvutility is used to create a SparkEnv (for the driver and executors and creates a BlockManager)
Initializing¶
init(
blockDataManager: BlockDataManager): Unit
init is part of the BlockTransferService abstraction.
init creates a NettyBlockRpcServer (with the application ID, a JavaSerializer and the given BlockDataManager).
init creates a TransportContext (with the NettyBlockRpcServer just created) and requests it for a TransportClientFactory.
init createServer.
In the end, init prints out the following INFO message to the logs:
Server created on [hostName]:[port]
Fetching Blocks¶
fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener,
tempFileManager: DownloadFileManager): Unit
fetchBlocks prints out the following TRACE message to the logs:
Fetch blocks from [host]:[port] (executor id [execId])
fetchBlocks requests the TransportConf for the maxIORetries.
fetchBlocks creates a BlockTransferStarter.
With the maxIORetries above zero, fetchBlocks creates a RetryingBlockFetcher (with the BlockFetchStarter, the blockIds and the BlockFetchingListener) and starts it.
Otherwise, fetchBlocks requests the BlockFetchStarter to createAndStart (with the blockIds and the BlockFetchingListener).
In case of any Exception, fetchBlocks prints out the following ERROR message to the logs and the given BlockFetchingListener gets notified.
Exception while beginning fetchBlocks
fetchBlocks is part of the BlockStoreClient abstraction.
BlockTransferStarter¶
fetchBlocks creates a BlockTransferStarter. When requested to createAndStart, the BlockTransferStarter requests the TransportClientFactory to create a TransportClient.
createAndStart creates an OneForOneBlockFetcher and requests it to start.
IOException¶
In case of an IOException, createAndStart requests the driver RpcEndpointRef to send an IsExecutorAlive message synchronously (with the given execId).
If the driver RpcEndpointRef replied false, createAndStart throws an ExecutorDeadException:
The relative remote executor(Id: [execId]),
which maintains the block data to fetch is dead.
Otherwise, createAndStart (re)throws the IOException.
Uploading Block¶
uploadBlock(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Future[Unit]
uploadBlock is part of the BlockTransferService abstraction.
uploadBlock creates a TransportClient (with the given hostname and port).
uploadBlock serializes the given StorageLevel and ClassTag (using a JavaSerializer).
uploadBlock uses a stream to transfer shuffle blocks when one of the following holds:
- The size of the block data (
ManagedBuffer) is above spark.network.maxRemoteBlockSizeFetchToMem configuration property - The given BlockId is a shuffle block
For stream transfer uploadBlock requests the TransportClient to uploadStream. Otherwise, uploadBlock requests the TransportClient to sendRpc a UploadBlock message.
Note
UploadBlock message is processed by NettyBlockRpcServer.
With the upload successful, uploadBlock prints out the following TRACE message to the logs:
Successfully uploaded block [blockId] [as stream]
With the upload failed, uploadBlock prints out the following ERROR message to the logs:
Error while uploading block [blockId] [as stream]
Logging¶
Enable ALL logging level for org.apache.spark.network.netty.NettyBlockTransferService logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.network.netty.NettyBlockTransferService=ALL
Refer to Logging.