BlockInfoManager

BlockInfoManager is used by BlockManager (and MemoryStore) to manage metadata of memory blocks and control concurrent access by locks for reading and writing.

Locks are the mechanism to control concurrent access to data and prevent destructive interaction between operations that use the same resource.

BlockInfoManager is used to create a MemoryStore and a BlockManagerManagedBuffer.

Creating Instance

BlockInfoManager takes no parameters to be created.

BlockInfoManager is created for BlockManager.

BlockInfoManager BlockManager
Figure 1. BlockInfoManager and BlockManager

Block Metadata

infos: Map[BlockId, BlockInfo]

BlockInfoManager uses a registry of block metadatas per block.

Read and Write Locks By Task

Tracks tasks (by TaskAttemptId) and the blocks they locked for reading (as BlockId).

Tracks tasks (by TaskAttemptId) and the blocks they locked for writing (as BlockId).

Registering Task (Start of Execution)

registerTask(
  taskAttemptId: Long): Unit

registerTask merely adds a new "empty" entry for the given task (by the task attempt ID) to readLocksByTask internal registry.

registerTask is used when:

Downgrading Exclusive Write Lock For Block to Shared Read Lock

downgradeLock(
  blockId: BlockId): Unit

downgradeLock prints out the following TRACE message to the logs:

Task [currentTaskAttemptId] downgrading write lock for [blockId]

downgradeLock…​FIXME

downgradeLock is used when BlockManager is requested to doPut and downgradeLock.

Obtaining Read Lock For Block

lockForReading(
  blockId: BlockId,
  blocking: Boolean = true): Option[BlockInfo]

lockForReading locks blockId memory block for reading when the block was registered earlier and no writer tasks use it.

When executed, lockForReading prints out the following TRACE message to the logs:

Task [currentTaskAttemptId] trying to acquire read lock for [blockId]

lockForReading looks up the metadata of the blockId block (in infos registry).

If no metadata could be found, it returns None which means that the block does not exist or was removed (and anybody could acquire a write lock).

Otherwise, when the metadata was found, i.e. registered, it checks so-called writerTask. Only when the block has no writer tasks, a read lock can be acquired. If so, the readerCount of the block metadata is incremented and the block is recorded (in the internal readLocksByTask registry). You should see the following TRACE message in the logs:

Task [taskAttemptId] acquired read lock for [blockId]

The BlockInfo for the blockId block is returned.

-1024 is a special taskAttemptId, aka NON_TASK_WRITER, used to mark a non-task thread, e.g. by a driver thread or by unit test code.

For blocks with writerTask other than NO_WRITER, when blocking is enabled, lockForReading waits (until another thread invokes the Object.notify method or the Object.notifyAll methods for this object).

With blocking enabled, it will repeat the waiting-for-read-lock sequence until either None or the lock is obtained.

When blocking is disabled and the lock could not be obtained, None is returned immediately.

lockForReading is a synchronized method, i.e. no two objects can use this and other instance methods.

lockForReading is used when:

Obtaining Write Lock for Block

lockForWriting(
  blockId: BlockId,
  blocking: Boolean = true): Option[BlockInfo]

lockForWriting prints out the following TRACE message to the logs:

Task [currentTaskAttemptId] trying to acquire write lock for [blockId]

lockForWriting looks up blockId in the internal infos registry. When no BlockInfo could be found, None is returned. Otherwise, blockId block is checked for writerTask to be BlockInfo.NO_WRITER with no readers (i.e. readerCount is 0) and only then the lock is returned.

When the write lock can be returned, BlockInfo.writerTask is set to currentTaskAttemptId and a new binding is added to the internal writeLocksByTask registry. You should see the following TRACE message in the logs:

Task [currentTaskAttemptId] acquired write lock for [blockId]

If, for some reason, blockId has a writer or the number of readers is positive (i.e. BlockInfo.readerCount is greater than 0), the method will wait (based on the input blocking flag) and attempt the write lock acquisition process until it finishes with a write lock.

(deadlock possible) The method is synchronized and can block, i.e. wait that causes the current thread to wait until another thread invokes Object.notify or Object.notifyAll methods for this object.

lockForWriting returns None for no blockId in the internal infos registry or when blocking flag is disabled and the write lock could not be acquired.

lockForWriting is used when:

Obtaining Write Lock for New Block

lockNewBlockForWriting(
  blockId: BlockId,
  newBlockInfo: BlockInfo): Boolean

lockNewBlockForWriting obtains a write lock for blockId but only when the method could register the block.

lockNewBlockForWriting is similar to lockForWriting method but for brand new blocks.

When executed, lockNewBlockForWriting prints out the following TRACE message to the logs:

Task [currentTaskAttemptId] trying to put [blockId]

If some other thread has already created the block, it finishes returning false. Otherwise, when the block does not exist, newBlockInfo is recorded in the internal infos registry and the block is locked for this client for writing. It then returns true.

lockNewBlockForWriting executes itself in synchronized block so once the BlockInfoManager is locked the other internal registries should be available only for the currently-executing thread.

lockNewBlockForWriting is used when BlockManager is requested to doPut.

Releasing Lock on Block

unlock(
  blockId: BlockId): Unit

unlock prints out the following TRACE message to the logs:

Task [currentTaskAttemptId] releasing lock for [blockId]

unlock gets the metadata for blockId. It may throw a IllegalStateException if the block was not found.

If the writer task for the block is not NO_WRITER, it becomes so and the blockId block is removed from the internal writeLocksByTask registry for the current task attempt.

Otherwise, if the writer task is indeed NO_WRITER, it is assumed that the blockId block is locked for reading. The readerCount counter is decremented for the blockId block and the read lock removed from the internal readLocksByTask registry for the current task attempt.

In the end, unlock wakes up all the threads waiting for the BlockInfoManager (using Java’s Object.notifyAll).

FIXME What threads could wait?

unlock is used when:

Releasing All Locks Obtained by Task

releaseAllLocksForTask(
  taskAttemptId: TaskAttemptId): Seq[BlockId]

releaseAllLocksForTask…​FIXME

releaseAllLocksForTask is used when BlockManager is requested to releaseAllLocksForTask.

Removing Block

removeBlock(
  blockId: BlockId): Unit

removeBlock…​FIXME

removeBlock is used when:

assertBlockIsLockedForWriting Method

assertBlockIsLockedForWriting(
  blockId: BlockId): BlockInfo

assertBlockIsLockedForWriting…​FIXME

assertBlockIsLockedForWriting is used when BlockManager is requested to dropFromMemory and removeBlockInternal.

currentTaskAttemptId Internal Method

currentTaskAttemptId: Long /* TaskAttemptId */

currentTaskAttemptId…​FIXME

currentTaskAttemptId is used when…​FIXME

Deleting All State

clear(): Unit

clear…​FIXME

clear is used when BlockManager is requested to stop.

Logging

Enable ALL logging level for org.apache.spark.storage.BlockInfoManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockInfoManager=ALL

Refer to Logging.