Skip to content


NettyBlockTransferService is a BlockTransferService that uses Netty for uploading and fetching blocks of data.

NettyBlockTransferService, SparkEnv and BlockManager

Creating Instance

NettyBlockTransferService takes the following to be created:

NettyBlockTransferService is created when:


Enable ALL logging level for logger to see what happens inside.

Add the following line to conf/

Refer to Logging.

Review Me

== [[transportConf]][[transportContext]] TransportConf, TransportContext

NettyBlockTransferService creates a[] for shuffle module (using[SparkTransportConf] utility) when <>.

NettyBlockTransferService uses the TransportConf for the following:

  • Create a[] when requested to <>

  • Create a[] and a[RetryingBlockFetcher] when requested to <>

NettyBlockTransferService uses the TransportContext to create the <> and the <>.

== [[clientFactory]] TransportClientFactory

NettyBlockTransferService creates a[] when requested to <>.

NettyBlockTransferService uses the TransportClientFactory for the following:

  • <>

  • <>

  • <>

NettyBlockTransferService requests the TransportClientFactory to[close] when requested to <>.

== [[server]] TransportServer

NettyBlockTransferService <> when requested to <>.

NettyBlockTransferService uses the TransportServer for the following:

  • <>

  • <>

NettyBlockTransferService requests the TransportServer to[close] when requested to <>.

== [[port]] Port

NettyBlockTransferService simply requests the <> for the[port].

== [[shuffleMetrics]] Shuffle Metrics


shuffleMetrics(): MetricSet


shuffleMetrics is part of the[ShuffleClient] abstraction.

== [[fetchBlocks]] Fetching Blocks

[source, scala]

fetchBlocks( host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener): Unit

When executed, fetchBlocks prints out the following TRACE message in the logs:

TRACE Fetch blocks from [host]:[port] (executor id [execId])

fetchBlocks then creates a RetryingBlockFetcher.BlockFetchStarter where createAndStart method...FIXME

Depending on the maximum number of acceptable IO exceptions (such as connection timeouts) per request, if the number is greater than 0, fetchBlocks creates a[RetryingBlockFetcher] and[starts] it immediately.

NOTE: RetryingBlockFetcher is created with the RetryingBlockFetcher.BlockFetchStarter created earlier, the input blockIds and listener.

If however the number of retries is not greater than 0 (it could be 0 or less), the RetryingBlockFetcher.BlockFetchStarter created earlier is started (with the input blockIds and listener).

In case of any Exception, you should see the following ERROR message in the logs and the input BlockFetchingListener gets notified (using onBlockFetchFailure for every block id).

ERROR Exception while beginning fetchBlocks

fetchBlocks is part of[BlockTransferService] abstraction.

== [[init]] Initializing NettyBlockTransferService

[source, scala]

init( blockDataManager: BlockDataManager): Unit

init creates a[] (for the[application id], a JavaSerializer and the given[BlockDataManager]) that is used to create a <>.

init creates the internal clientFactory and a server.

CAUTION: FIXME What's the "a server"?

In the end, you should see the INFO message in the logs:

Server created on [hostName]:[port]

NOTE: hostname is given when[NettyBlockTransferService is created] and is controlled by[ Spark property] for the driver and differs per deployment environment for executors (as controlled by[--hostname for CoarseGrainedExecutorBackend]).

init is part of the[BlockTransferService] abstraction.

== [[uploadBlock]] Uploading Block

[source, scala]

uploadBlock( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Future[Unit]

Internally, uploadBlock creates a TransportClient client to send a <UploadBlock message>> (to the input hostname and port).

NOTE: UploadBlock message is processed by[NettyBlockRpcServer].

The UploadBlock message holds the <>, the input execId and blockId. It also holds the serialized bytes for block metadata with level and classTag serialized (using the internal JavaSerializer) as well as the serialized bytes for the input blockData itself (this time however the serialization uses[ManagedBuffer.nioByteBuffer method]).

The entire UploadBlock message is further serialized before sending (using TransportClient.sendRpc).

CAUTION: FIXME Describe TransportClient and clientFactory.createClient.

When blockId block was successfully uploaded, you should see the following TRACE message in the logs:

TRACE NettyBlockTransferService: Successfully uploaded block [blockId]

When an upload failed, you should see the following ERROR message in the logs:

ERROR Error while uploading block [blockId]

uploadBlock is part of the[BlockTransferService] abstraction.

== [[UploadBlock]] UploadBlock Message

UploadBlock is a BlockTransferMessage that describes a block being uploaded, i.e. send over the wire from a <> to a[NettyBlockRpcServer].

.UploadBlock Attributes [cols="1,2",options="header",width="100%"] |=== | Attribute | Description | appId | The application id (the block belongs to) | execId | The executor id | blockId | The block id | metadata | | blockData | The block data as an array of bytes |===

As an Encodable, UploadBlock can calculate the encoded size and do encoding and decoding itself to or from a ByteBuf, respectively.

== [[createServer]] createServer Internal Method

[source, scala]

createServer( bootstraps: List[TransportServerBootstrap]): TransportServer


createServer is used when NettyBlockTransferService is requested to <>.

Last update: 2021-07-23
Back to top