NettyBlockTransferService¶
NettyBlockTransferService
is a BlockTransferService that uses Netty for uploading and fetching blocks of data.
Creating Instance¶
NettyBlockTransferService
takes the following to be created:
- SparkConf
-
SecurityManager
- Bind Address
- Host Name
- Port
- Number of CPU Cores
- Driver RpcEndpointRef
NettyBlockTransferService
is created when:
SparkEnv
utility is used to create a SparkEnv (for the driver and executors and creates a BlockManager)
Initializing¶
init(
blockDataManager: BlockDataManager): Unit
init
is part of the BlockTransferService abstraction.
init
creates a NettyBlockRpcServer (with the application ID, a JavaSerializer
and the given BlockDataManager).
init
creates a TransportContext (with the NettyBlockRpcServer
just created) and requests it for a TransportClientFactory.
init
createServer.
In the end, init
prints out the following INFO message to the logs:
Server created on [hostName]:[port]
Fetching Blocks¶
fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener,
tempFileManager: DownloadFileManager): Unit
fetchBlocks
prints out the following TRACE message to the logs:
Fetch blocks from [host]:[port] (executor id [execId])
fetchBlocks
requests the TransportConf for the maxIORetries.
fetchBlocks
creates a BlockTransferStarter.
With the maxIORetries
above zero, fetchBlocks
creates a RetryingBlockFetcher (with the BlockFetchStarter
, the blockIds
and the BlockFetchingListener) and starts it.
Otherwise, fetchBlocks
requests the BlockFetchStarter
to createAndStart (with the blockIds
and the BlockFetchingListener
).
In case of any Exception
, fetchBlocks
prints out the following ERROR message to the logs and the given BlockFetchingListener
gets notified.
Exception while beginning fetchBlocks
fetchBlocks
is part of the BlockStoreClient abstraction.
BlockTransferStarter¶
fetchBlocks
creates a BlockTransferStarter
. When requested to createAndStart
, the BlockTransferStarter
requests the TransportClientFactory to create a TransportClient.
createAndStart
creates an OneForOneBlockFetcher and requests it to start.
IOException¶
In case of an IOException
, createAndStart
requests the driver RpcEndpointRef to send an IsExecutorAlive
message synchronously (with the given execId
).
If the driver RpcEndpointRef
replied false
, createAndStart
throws an ExecutorDeadException:
The relative remote executor(Id: [execId]),
which maintains the block data to fetch is dead.
Otherwise, createAndStart
(re)throws the IOException
.
Uploading Block¶
uploadBlock(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Future[Unit]
uploadBlock
is part of the BlockTransferService abstraction.
uploadBlock
creates a TransportClient
(with the given hostname
and port
).
uploadBlock
serializes the given StorageLevel and ClassTag
(using a JavaSerializer
).
uploadBlock
uses a stream to transfer shuffle blocks when one of the following holds:
- The size of the block data (
ManagedBuffer
) is above spark.network.maxRemoteBlockSizeFetchToMem configuration property - The given BlockId is a shuffle block
For stream transfer uploadBlock
requests the TransportClient
to uploadStream
. Otherwise, uploadBlock
requests the TransportClient
to sendRpc
a UploadBlock
message.
Note
UploadBlock
message is processed by NettyBlockRpcServer.
With the upload successful, uploadBlock
prints out the following TRACE message to the logs:
Successfully uploaded block [blockId] [as stream]
With the upload failed, uploadBlock
prints out the following ERROR message to the logs:
Error while uploading block [blockId] [as stream]
Logging¶
Enable ALL
logging level for org.apache.spark.network.netty.NettyBlockTransferService
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.network.netty.NettyBlockTransferService=ALL
Refer to Logging.