BlockInfoManager

BlockInfoManager manages memory blocks (aka memory pages). It controls concurrent access to memory blocks by read and write locks (for existing and new ones).

Locks are the mechanism to control concurrent access to data and prevent destructive interaction between operations that use the same resource.
Table 1. BlockInfoManager Internal Registries and Counters
Name Description

infos

Tracks BlockInfo per block (as BlockId).

readLocksByTask

Tracks tasks (by TaskAttemptId) and the blocks they locked for reading (as BlockId).

writeLocksByTask

Tracks tasks (by TaskAttemptId) and the blocks they locked for writing (as BlockId).

BlockInfoManager is a private[storage] class that belongs to org.apache.spark.storage package.

Enable TRACE logging level for org.apache.spark.storage.BlockInfoManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockInfoManager=TRACE

Refer to Logging.

registerTask Method

FIXME

Downgrading Exclusive Write Lock For Block to Shared Read Lock — downgradeLock Method

downgradeLock(blockId: BlockId): Unit

downgradeLock…​FIXME

Obtaining Read Lock For Block — lockForReading Method

lockForReading(
  blockId: BlockId,
  blocking: Boolean = true): Option[BlockInfo]

lockForReading locks blockId memory block for reading when the block was registered earlier and no writer tasks use it.

When executed, lockForReading prints out the following TRACE message to the logs:

TRACE BlockInfoManager: Task [currentTaskAttemptId] trying to acquire read lock for [blockId]

lockForReading looks up the metadata of the blockId block (in infos registry).

If no metadata could be found, it returns None which means that the block does not exist or was removed (and anybody could acquire a write lock).

Otherwise, when the metadata was found, i.e. registered, it checks so-called writerTask. Only when the block has no writer tasks, a read lock can be acquired. If so, the readerCount of the block metadata is incremented and the block is recorded (in the internal readLocksByTask registry). You should see the following TRACE message in the logs:

TRACE BlockInfoManager: Task [taskAttemptId] acquired read lock for [blockId]

The BlockInfo for the blockId block is returned.

-1024 is a special taskAttemptId, aka NON_TASK_WRITER, used to mark a non-task thread, e.g. by a driver thread or by unit test code.

For blocks with writerTask other than NO_WRITER, when blocking is enabled, lockForReading waits (until another thread invokes the Object.notify method or the Object.notifyAll methods for this object).

With blocking enabled, it will repeat the waiting-for-read-lock sequence until either None or the lock is obtained.

When blocking is disabled and the lock could not be obtained, None is returned immediately.

lockForReading is a synchronized method, i.e. no two objects can use this and other instance methods.

Obtaining Write Lock for Block — lockForWriting Method

lockForWriting(
  blockId: BlockId,
  blocking: Boolean = true): Option[BlockInfo]

When executed, lockForWriting prints out the following TRACE message to the logs:

TRACE Task [currentTaskAttemptId] trying to acquire write lock for [blockId]

It looks up blockId in the internal infos registry. When no BlockInfo could be found, None is returned. Otherwise, blockId block is checked for writerTask to be BlockInfo.NO_WRITER with no readers (i.e. readerCount is 0) and only then the lock is returned.

When the write lock can be returned, BlockInfo.writerTask is set to currentTaskAttemptId and a new binding is added to the internal writeLocksByTask registry. You should see the following TRACE message in the logs:

TRACE Task [currentTaskAttemptId] acquired write lock for [blockId]

If, for some reason, blockId has a writer or the number of readers is positive (i.e. BlockInfo.readerCount is greater than 0), the method will wait (based on the input blocking flag) and attempt the write lock acquisition process until it finishes with a write lock.

(deadlock possible) The method is synchronized and can block, i.e. wait that causes the current thread to wait until another thread invokes Object.notify or Object.notifyAll methods for this object.

lockForWriting return None for no blockId in the internal infos registry or when blocking flag is disabled and the write lock could not be acquired.

Obtaining Write Lock for New Block — lockNewBlockForWriting Method

lockNewBlockForWriting(
  blockId: BlockId,
  newBlockInfo: BlockInfo): Boolean

lockNewBlockForWriting obtains a write lock for blockId but only when the method could register the block.

lockNewBlockForWriting is similar to lockForWriting method but for brand new blocks.

When executed, lockNewBlockForWriting prints out the following TRACE message to the logs:

TRACE Task [currentTaskAttemptId] trying to put [blockId]

If some other thread has already created the block, it finishes returning false. Otherwise, when the block does not exist, newBlockInfo is recorded in the internal infos registry and the block is locked for this client for writing. It then returns true.

lockNewBlockForWriting executes itself in synchronized block so once the BlockInfoManager is locked the other internal registries should be available only for the currently-executing thread.

currentTaskAttemptId Method

FIXME

Releasing Lock on Block — unlock Method

unlock(blockId: BlockId): Unit

unlock releases…​FIXME

When executed, unlock starts by printing out the following TRACE message to the logs:

TRACE BlockInfoManager: Task [currentTaskAttemptId] releasing lock for [blockId]

unlock gets the metadata for blockId. It may throw a IllegalStateException if the block was not found.

If the writer task for the block is not NO_WRITER, it becomes so and the blockId block is removed from the internal writeLocksByTask registry for the current task attempt.

Otherwise, if the writer task is indeed NO_WRITER, it is assumed that the blockId block is locked for reading. The readerCount counter is decremented for the blockId block and the read lock removed from the internal readLocksByTask registry for the current task attempt.

In the end, unlock wakes up all the threads waiting for the BlockInfoManager (using Java’s Object.notifyAll).

FIXME What threads could wait?

Releasing All Locks Obtained by Task — releaseAllLocksForTask Method

FIXME

Removing Memory Block — removeBlock Method

FIXME

assertBlockIsLockedForWriting Method

FIXME