ExternalShuffleBlockHandler

ExternalShuffleBlockHandler is a RpcHandler.

When created, ExternalShuffleBlockHandler requires a OneForOneStreamManager and TransportConf with a registeredExecutorFile to create a ExternalShuffleBlockResolver.

Handling Messages

handleMessage(
  BlockTransferMessage msgObj,
  TransportClient client,
  RpcResponseCallback callback)

handleMessage handles two types of BlockTransferMessage messages:

For any other BlockTransferMessage message it throws a UnsupportedOperationException:

Unexpected message: [msgObj]

OpenBlocks

OpenBlocks(
  String appId,
  String execId,
  String[] blockIds)

When OpenBlocks is received, handleMessage authorizes the client.

FIXME checkAuth?

It then gets block data for each block id in blockIds (using ExternalShuffleBlockResolver).

Finally, it registers a stream and does callback.onSuccess with a serialized byte buffer (for the streamId and the number of blocks in msg).

FIXME callback.onSuccess?

You should see the following TRACE message in the logs:

Registered streamId [streamId] with [length] buffers for client [clientId] from host [remoteAddress]

RegisterExecutor

RegisterExecutor(
  String appId,
  String execId,
  ExecutorShuffleInfo executorInfo)

RegisterExecutor…​FIXME

Logging

Enable ALL logging level for org.apache.spark.network.shuffle.ExternalShuffleBlockHandler logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.network.shuffle.ExternalShuffleBlockHandler=ALL

Refer to Logging.