BlockManagerSlaveEndpoint

BlockManagerSlaveEndpoint is a ThreadSafeRpcEndpoint for BlockManager.

Creating Instance

BlockManagerSlaveEndpoint takes the following to be created:

BlockManagerSlaveEndpoint is created for BlockManager (and registered under the name BlockManagerEndpoint[ID]).

Messages

GetBlockStatus

GetBlockStatus(
  blockId: BlockId,
  askSlaves: Boolean = true)

When received, BlockManagerSlaveEndpoint requests the BlockManager for the status of a given block (by BlockId) and sends it back to a sender.

Posted when…​FIXME

GetMatchingBlockIds

GetMatchingBlockIds(
  filter: BlockId => Boolean,
  askSlaves: Boolean = true)

When received, BlockManagerSlaveEndpoint requests the BlockManager to find IDs of existing blocks for a given filter and sends them back to a sender.

Posted when…​FIXME

RemoveBlock

RemoveBlock(
  blockId: BlockId)

When received, BlockManagerSlaveEndpoint prints out the following DEBUG message to the logs:

removing block [blockId]

BlockManagerSlaveEndpoint then BlockManager to remove blockId block.

When the computation is successful, you should see the following DEBUG in the logs:

Done removing block [blockId], response is [response]

And true response is sent back. You should see the following DEBUG in the logs:

Sent response: true to [senderAddress]

In case of failure, you should see the following ERROR in the logs and the stack trace.

Error in removing block [blockId]

RemoveBroadcast

RemoveBroadcast(
  broadcastId: Long,
  removeFromDriver: Boolean = true)

When received, BlockManagerSlaveEndpoint prints out the following DEBUG message to the logs:

removing broadcast [broadcastId]

When the computation is successful, you should see the following DEBUG in the logs:

Done removing broadcast [broadcastId], response is [response]

And the result is sent back. You should see the following DEBUG in the logs:

Sent response: [response] to [senderAddress]

In case of failure, you should see the following ERROR in the logs and the stack trace.

Error in removing broadcast [broadcastId]

RemoveRdd

RemoveRdd(
  rddId: Int)

When received, BlockManagerSlaveEndpoint prints out the following DEBUG message to the logs:

removing RDD [rddId]
Handling RemoveRdd messages happens on a separate thread. See BlockManagerSlaveEndpoint Thread Pool.

When the computation is successful, you should see the following DEBUG in the logs:

Done removing RDD [rddId], response is [response]

And the number of blocks removed is sent back. You should see the following DEBUG in the logs:

Sent response: [#blocks] to [senderAddress]

In case of failure, you should see the following ERROR in the logs and the stack trace.

Error in removing RDD [rddId]

RemoveShuffle

RemoveShuffle(
  shuffleId: Int)

When received, BlockManagerSlaveEndpoint prints out the following DEBUG message to the logs:

removing shuffle [shuffleId]

If MapOutputTracker was given (when the RPC endpoint was created), it calls MapOutputTracker to unregister the shuffleId shuffle.

Handling RemoveShuffle messages happens on a separate thread. See BlockManagerSlaveEndpoint Thread Pool.

When the computation is successful, you should see the following DEBUG in the logs:

Done removing shuffle [shuffleId], response is [response]

And the result is sent back. You should see the following DEBUG in the logs:

Sent response: [response] to [senderAddress]

In case of failure, you should see the following ERROR in the logs and the stack trace.

Error in removing shuffle [shuffleId]

Posted when BlockManagerMaster and BlockManagerMasterEndpoint are requested to remove all blocks of a shuffle.

ReplicateBlock

ReplicateBlock(
  blockId: BlockId,
  replicas: Seq[BlockManagerId],
  maxReplicas: Int)

When received, BlockManagerSlaveEndpoint…​FIXME

Posted when…​FIXME

TriggerThreadDump

When received, BlockManagerSlaveEndpoint is requested for the thread info for all live threads with stack trace and synchronization information.

block-manager-slave-async-thread-pool Thread Pool

BlockManagerSlaveEndpoint creates a thread pool of maximum 100 daemon threads with block-manager-slave-async-thread-pool thread prefix (using java.util.concurrent.ThreadPoolExecutor).

BlockManagerSlaveEndpoint uses the thread pool (as a Scala implicit value) when requested to doAsync to communicate in a non-blocking, asynchronous way.

The thread pool is shut down when BlockManagerSlaveEndpoint is requested to stop.

The reason for the async thread pool is that the block-related operations might take quite some time and to release the main RPC thread other threads are spawned to talk to the external services and pass responses on to the clients.

doAsync Internal Method

doAsync[T](
  actionMessage: String,
  context: RpcCallContext)(
  body: => T)

doAsync creates a Scala Future to execute the following asynchronously (i.e. on a separate thread from the Thread Pool):

  1. Prints out the given actionMessage as a DEBUG message to the logs

  2. Executes the given body

When completed successfully, doAsync prints out the following DEBUG messages to the logs and requests the given RpcCallContext to reply the response to the sender.

Done [actionMessage], response is [response]
Sent response: [response] to [senderAddress]

In case of a failure, doAsync prints out the following ERROR message to the logs and requests the given RpcCallContext to send the failure to the sender.

Error in [actionMessage]

doAsync is used when BlockManagerSlaveEndpoint is requested to handle RemoveBlock, RemoveRdd, RemoveShuffle and RemoveBroadcast messages.

Logging

Enable ALL logging level for org.apache.spark.storage.BlockManagerSlaveEndpoint logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockManagerSlaveEndpoint=ALL

Refer to Logging.