Skip to content

BlockInfoManager

BlockInfoManager is used by BlockManager (and MemoryStore) to manage metadata of memory blocks and control concurrent access by locks for reading and writing.

BlockInfoManager is used to create a MemoryStore and a BlockManagerManagedBuffer.

Creating Instance

BlockInfoManager takes no arguments to be created.

BlockInfoManager is created for BlockManager

BlockInfoManager and BlockManager

Block Metadata

infos: HashMap[BlockId, BlockInfo]

BlockInfoManager uses a registry of block metadatas per block.

Locks

Locks are the mechanism to control concurrent access to data and prevent destructive interaction between operations that use the same resource.

BlockInfoManager uses read and write locks by task attempts.

Read Locks

readLocksByTask: HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]

BlockInfoManager uses readLocksByTask registry to track tasks (by TaskAttemptId) and the blocks they locked for reading (as BlockIds).

A new entry is added when BlockInfoManager is requested to register a task (attempt).

A new BlockId is added to an existing task attempt in lockForReading.

Write Locks

Tracks tasks (by TaskAttemptId) and the blocks they locked for writing (as BlockId.md[]).

Registering Task (Execution Attempt)

registerTask(
  taskAttemptId: Long): Unit

registerTask registers a new "empty" entry for the given task (by the task attempt ID) to the readLocksByTask internal registry.

registerTask is used when:

Downgrading Exclusive Write Lock to Shared Read Lock

downgradeLock(
  blockId: BlockId): Unit

downgradeLock prints out the following TRACE message to the logs:

Task [currentTaskAttemptId] downgrading write lock for [blockId]

downgradeLock...FIXME

downgradeLock is used when:

Obtaining Read Lock for Block

lockForReading(
  blockId: BlockId,
  blocking: Boolean = true): Option[BlockInfo]

lockForReading locks a given memory block for reading when the block was registered earlier and no writer tasks use it.

When executed, lockForReading prints out the following TRACE message to the logs:

Task [currentTaskAttemptId] trying to acquire read lock for [blockId]

lockForReading looks up the metadata of the blockId block (in the infos registry).

If no metadata could be found, lockForReading returns None which means that the block does not exist or was removed (and anybody could acquire a write lock).

Otherwise, when the metadata was found (i.e. registered) lockForReading checks so-called writerTask. Only when the block has no writer tasks, a read lock can be acquired. If so, the readerCount of the block metadata is incremented and the block is recorded (in the internal readLocksByTask registry). lockForReading prints out the following TRACE message to the logs:

Task [currentTaskAttemptId] acquired read lock for [blockId]

The BlockInfo for the blockId block is returned.

Note

-1024 is a special taskAttemptId (NON_TASK_WRITER) used to mark a non-task thread, e.g. by a driver thread or by unit test code.

For blocks with writerTask other than NO_WRITER, when blocking is enabled, lockForReading waits (until another thread invokes the Object.notify method or the Object.notifyAll methods for this object).

With blocking enabled, it will repeat the waiting-for-read-lock sequence until either None or the lock is obtained.

When blocking is disabled and the lock could not be obtained, None is returned immediately.

Note

lockForReading is a synchronized method, i.e. no two objects can use this and other instance methods.

lockForReading is used when:

Obtaining Write Lock for Block

lockForWriting(
  blockId: BlockId,
  blocking: Boolean = true): Option[BlockInfo]

lockForWriting prints out the following TRACE message to the logs:

Task [currentTaskAttemptId] trying to acquire write lock for [blockId]

lockForWriting finds the blockId (in the infos registry). When no BlockInfo could be found, None is returned. Otherwise, blockId block is checked for writerTask to be BlockInfo.NO_WRITER with no readers (i.e. readerCount is 0) and only then the lock is returned.

When the write lock can be returned, BlockInfo.writerTask is set to currentTaskAttemptId and a new binding is added to the internal writeLocksByTask registry. lockForWriting prints out the following TRACE message to the logs:

Task [currentTaskAttemptId] acquired write lock for [blockId]

If, for some reason, BlockInfo.md#writerTask[blockId has a writer] or the number of readers is positive (i.e. BlockInfo.readerCount is greater than 0), the method will wait (based on the input blocking flag) and attempt the write lock acquisition process until it finishes with a write lock.

NOTE: (deadlock possible) The method is synchronized and can block, i.e. wait that causes the current thread to wait until another thread invokes Object.notify or Object.notifyAll methods for this object.

lockForWriting returns None for no blockId in the internal infos registry or when blocking flag is disabled and the write lock could not be acquired.

lockForWriting is used when:

Obtaining Write Lock for New Block

lockNewBlockForWriting(
  blockId: BlockId,
  newBlockInfo: BlockInfo): Boolean

lockNewBlockForWriting obtains a write lock for blockId but only when the method could register the block.

Note

lockNewBlockForWriting is similar to lockForWriting method but for brand new blocks.

When executed, lockNewBlockForWriting prints out the following TRACE message to the logs:

Task [currentTaskAttemptId] trying to put [blockId]

If some other thread has already created the block, lockNewBlockForWriting finishes returning false. Otherwise, when the block does not exist, newBlockInfo is recorded in the infos internal registry and the block is locked for this client for writing. lockNewBlockForWriting then returns true.

Note

lockNewBlockForWriting executes itself in synchronized block so once the BlockInfoManager is locked the other internal registries should be available for the current thread only.

lockNewBlockForWriting is used when:

  • BlockManager is requested to doPut

Releasing Lock on Block

unlock(
  blockId: BlockId,
  taskAttemptId: Option[TaskAttemptId] = None): Unit

unlock prints out the following TRACE message to the logs:

Task [currentTaskAttemptId] releasing lock for [blockId]

unlock gets the metadata for blockId (and throws an IllegalStateException if the block was not found).

If the writer task for the block is not NO_WRITER, it becomes so and the blockId block is removed from the internal writeLocksByTask registry for the current task attempt.

Otherwise, if the writer task is indeed NO_WRITER, the block is assumed locked for reading. The readerCount counter is decremented for the blockId block and the read lock removed from the internal readLocksByTask registry for the task attempt.

In the end, unlock wakes up all the threads waiting for the BlockInfoManager.

unlock is used when:

Logging

Enable ALL logging level for org.apache.spark.storage.BlockInfoManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockInfoManager=ALL

Refer to Logging.