BlockManagerMasterEndpoint — BlockManagerMaster RPC Endpoint

BlockManagerMasterEndpoint is a ThreadSafeRpcEndpoint for BlockManagerMaster.

BlockManagerMasterEndpoint is registered under BlockManagerMaster name.

BlockManagerMasterEndpoint tracks status of the BlockManagers (on the executors) in a Spark application.

Creating Instance

BlockManagerMasterEndpoint takes the following to be created:

BlockManagerMasterEndpoint is created for the SparkEnv on the driver (to create a BlockManagerMaster for a BlockManager).

When created, BlockManagerMasterEndpoint prints out the following INFO message to the logs:

BlockManagerMasterEndpoint up

Messages

As an RpcEndpoint, BlockManagerMasterEndpoint handles RPC messages.

BlockManagerHeartbeat

BlockManagerHeartbeat(
  blockManagerId: BlockManagerId)

When received, BlockManagerMasterEndpoint…​FIXME

Posted when…​FIXME

GetLocations

GetLocations(
  blockId: BlockId)

When received, BlockManagerMasterEndpoint replies with the locations of blockId.

GetLocationsAndStatus

GetLocationsAndStatus(
  blockId: BlockId)

When received, BlockManagerMasterEndpoint…​FIXME

Posted when…​FIXME

GetLocationsMultipleBlockIds

GetLocationsMultipleBlockIds(
  blockIds: Array[BlockId])

When received, BlockManagerMasterEndpoint replies with the getLocationsMultipleBlockIds for the given BlockId.

GetPeers

GetPeers(
  blockManagerId: BlockManagerId)

When received, BlockManagerMasterEndpoint replies with the peers of blockManagerId.

Peers of a BlockManager are the other BlockManagers in a cluster (except the driver’s BlockManager). Peers are used to know the available executors in a Spark application.

GetExecutorEndpointRef

GetExecutorEndpointRef(
  executorId: String)

When received, BlockManagerMasterEndpoint…​FIXME

Posted when…​FIXME

GetMemoryStatus

GetMemoryStatus

When received, BlockManagerMasterEndpoint…​FIXME

Posted when…​FIXME

GetStorageStatus

GetStorageStatus

When received, BlockManagerMasterEndpoint…​FIXME

Posted when…​FIXME

GetBlockStatus

GetBlockStatus(
  blockId: BlockId,
  askSlaves: Boolean = true)

When received, BlockManagerMasterEndpoint is requested to blockStatus.

Posted when…​FIXME

GetMatchingBlockIds

GetMatchingBlockIds(
  filter: BlockId => Boolean,
  askSlaves: Boolean = true)

When received, BlockManagerMasterEndpoint…​FIXME

Posted when…​FIXME

HasCachedBlocks

HasCachedBlocks(
  executorId: String)

When received, BlockManagerMasterEndpoint…​FIXME

Posted when…​FIXME

RegisterBlockManager

RegisterBlockManager(
  blockManagerId: BlockManagerId,
  maxOnHeapMemSize: Long,
  maxOffHeapMemSize: Long,
  sender: RpcEndpointRef)

When received, BlockManagerMasterEndpoint is requested to register the BlockManager (by the given BlockManagerId).

Posted when BlockManagerMaster is requested to register a BlockManager

RemoveRdd

RemoveRdd(
  rddId: Int)

When received, BlockManagerMasterEndpoint…​FIXME

Posted when…​FIXME

RemoveShuffle

RemoveShuffle(
  shuffleId: Int)

When received, BlockManagerMasterEndpoint…​FIXME

Posted when…​FIXME

RemoveBroadcast

RemoveBroadcast(
  broadcastId: Long,
  removeFromDriver: Boolean = true)

When received, BlockManagerMasterEndpoint…​FIXME

Posted when…​FIXME

RemoveBlock

RemoveBlock(
  blockId: BlockId)

When received, BlockManagerMasterEndpoint…​FIXME

Posted when…​FIXME

RemoveExecutor

RemoveExecutor(
  execId: String)

When received, BlockManagerMasterEndpoint executor execId is removed and the response true sent back.

StopBlockManagerMaster

StopBlockManagerMaster

When received, BlockManagerMasterEndpoint…​FIXME

Posted when…​FIXME

UpdateBlockInfo

UpdateBlockInfo(
  blockManagerId: BlockManagerId,
  blockId: BlockId,
  storageLevel: StorageLevel,
  memSize: Long,
  diskSize: Long)

When received, BlockManagerMasterEndpoint…​FIXME

Posted when BlockManagerMaster is requested to handle a block status update (from BlockManager on an executor).

storageStatus Internal Method

storageStatus: Array[StorageStatus]

storageStatus…​FIXME

storageStatus is used when BlockManagerMasterEndpoint is requested to handle GetStorageStatus message.

getLocationsMultipleBlockIds Internal Method

getLocationsMultipleBlockIds(
  blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]]

getLocationsMultipleBlockIds…​FIXME

getLocationsMultipleBlockIds is used when BlockManagerMasterEndpoint is requested to handle GetLocationsMultipleBlockIds message.

removeShuffle Internal Method

removeShuffle(
  shuffleId: Int): Future[Seq[Boolean]]

removeShuffle…​FIXME

removeShuffle is used when BlockManagerMasterEndpoint is requested to handle RemoveShuffle message.

getPeers Internal Method

getPeers(
  blockManagerId: BlockManagerId): Seq[BlockManagerId]

getPeers finds all the registered BlockManagers (using blockManagerInfo internal registry) and checks if the input blockManagerId is amongst them.

If the input blockManagerId is registered, getPeers returns all the registered BlockManagers but the one on the driver and blockManagerId.

Otherwise, getPeers returns no BlockManagers.

Peers of a BlockManager are the other BlockManagers in a cluster (except the driver’s BlockManager). Peers are used to know the available executors in a Spark application.

getPeers is used when BlockManagerMasterEndpoint is requested to handle GetPeers message.

register Internal Method

register(
  idWithoutTopologyInfo: BlockManagerId,
  maxOnHeapMemSize: Long,
  maxOffHeapMemSize: Long,
  slaveEndpoint: RpcEndpointRef): BlockManagerId

register registers a BlockManager (based on the given BlockManagerId) in the blockManagerIdByExecutor and blockManagerInfo registries and posts a SparkListenerBlockManagerAdded message (to the LiveListenerBus).

Registering a BlockManager can only happen once for an executor (identified by BlockManagerId.executorId in blockManagerIdByExecutor internal registry).

If another BlockManager has earlier been registered for the executor, you should see the following ERROR message in the logs:

Got two different block manager registrations on same executor - will replace old one [oldId] with new one [id]

register prints out the following INFO message to the logs:

Registering block manager [hostPort] with [bytes] RAM, [id]

The BlockManager is recorded in the internal registries:

In the end, register requests the LiveListenerBus to post a SparkListenerBlockManagerAdded message.

register is used when BlockManagerMasterEndpoint is requested to handle RegisterBlockManager message.

removeExecutor Internal Method

removeExecutor(
  execId: String): Unit

removeExecutor prints the following INFO message to the logs:

Trying to remove executor [execId] from BlockManagerMaster.

If the execId executor is registered (in the internal blockManagerIdByExecutor internal registry), removeExecutor removes the corresponding BlockManager.

removeExecutor is used when BlockManagerMasterEndpoint is requested to handle RemoveExecutor or RegisterBlockManager messages.

removeBlockManager Internal Method

removeBlockManager(
  blockManagerId: BlockManagerId): Unit

removeBlockManager looks up blockManagerId and removes the executor it was working on from the internal registries:

It then goes over all the blocks for the BlockManager, and removes the executor for each block from blockLocations registry.

You should then see the following INFO message in the logs:

Removing block manager [blockManagerId]

removeBlockManager is used when BlockManagerMasterEndpoint is requested to removeExecutor (to handle RemoveExecutor or RegisterBlockManager messages).

getLocations Internal Method

getLocations(
  blockId: BlockId): Seq[BlockManagerId]

getLocations looks up the given BlockId in the blockLocations internal registry and returns the locations (as a collection of BlockManagerId) or an empty collection.

getLocations is used when BlockManagerMasterEndpoint is requested to handle GetLocations and GetLocationsMultipleBlockIds messages.

Logging

Enable ALL logging level for org.apache.spark.storage.BlockManagerMasterEndpoint logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockManagerMasterEndpoint=ALL

Refer to Logging.

Internal Properties

blockManagerIdByExecutor Lookup Table

blockManagerIdByExecutor: Map[String, BlockManagerId]

Lookup table of BlockManagerIds by executor ID

A new executor is added when BlockManagerMasterEndpoint is requested to handle a RegisterBlockManager message (and registers a new BlockManager).

An executor is removed when BlockManagerMasterEndpoint is requested to handle a RemoveExecutor and a RegisterBlockManager messages (via removeBlockManager)

Used when BlockManagerMasterEndpoint is requested to handle HasCachedBlocks message, removeExecutor, register and getExecutorEndpointRef.

blockManagerInfo Lookup Table

blockManagerIdByExecutor: Map[String, BlockManagerId]

Lookup table of BlockManagerInfo by BlockManagerId

A new BlockManagerInfo is added when BlockManagerMasterEndpoint is requested to handle a RegisterBlockManager message (and registers a new BlockManager).

A BlockManagerInfo is removed when BlockManagerMasterEndpoint is requested to remove a BlockManager (to handle RemoveExecutor and RegisterBlockManager messages).

blockLocations

blockLocations: Map[BlockId, Set[BlockManagerId]]

Collection of BlockId and their locations (as BlockManagerId).

Used in removeRdd to remove blocks for a RDD, removeBlockManager to remove blocks after a BlockManager gets removed, removeBlockFromWorkers, updateBlockInfo, and getLocations.