BlockInfoManager¶
BlockInfoManager
is used by BlockManager (and MemoryStore) to manage metadata of memory blocks and control concurrent access by locks for reading and writing.
BlockInfoManager
is used to create a MemoryStore and a BlockManagerManagedBuffer
.
Creating Instance¶
BlockInfoManager
takes no arguments to be created.
BlockInfoManager
is created for BlockManager
Block Metadata¶
infos: HashMap[BlockId, BlockInfo]
BlockInfoManager
uses a registry of block metadatas per block.
Locks¶
Locks are the mechanism to control concurrent access to data and prevent destructive interaction between operations that use the same resource.
BlockInfoManager
uses read and write locks by task attempts.
Read Locks¶
readLocksByTask: HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
BlockInfoManager
uses readLocksByTask
registry to track tasks (by TaskAttemptId
) and the blocks they locked for reading (as BlockIds).
A new entry is added when BlockInfoManager
is requested to register a task (attempt).
A new BlockId
is added to an existing task attempt in lockForReading.
Write Locks¶
Tracks tasks (by TaskAttemptId
) and the blocks they locked for writing (as BlockId.md[]).
Registering Task (Execution Attempt)¶
registerTask(
taskAttemptId: Long): Unit
registerTask
registers a new "empty" entry for the given task (by the task attempt ID) to the readLocksByTask internal registry.
registerTask
is used when:
BlockInfoManager
is createdBlockManager
is requested to registerTask
Downgrading Exclusive Write Lock to Shared Read Lock¶
downgradeLock(
blockId: BlockId): Unit
downgradeLock
prints out the following TRACE message to the logs:
Task [currentTaskAttemptId] downgrading write lock for [blockId]
downgradeLock
...FIXME
downgradeLock
is used when:
BlockManager
is requested to doPut and downgradeLock
Obtaining Read Lock for Block¶
lockForReading(
blockId: BlockId,
blocking: Boolean = true): Option[BlockInfo]
lockForReading
locks a given memory block for reading when the block was registered earlier and no writer tasks use it.
When executed, lockForReading
prints out the following TRACE message to the logs:
Task [currentTaskAttemptId] trying to acquire read lock for [blockId]
lockForReading
looks up the metadata of the blockId
block (in the infos registry).
If no metadata could be found, lockForReading
returns None
which means that the block does not exist or was removed (and anybody could acquire a write lock).
Otherwise, when the metadata was found (i.e. registered) lockForReading
checks so-called writerTask. Only when the block has no writer tasks, a read lock can be acquired. If so, the readerCount
of the block metadata is incremented and the block is recorded (in the internal readLocksByTask registry). lockForReading
prints out the following TRACE message to the logs:
Task [currentTaskAttemptId] acquired read lock for [blockId]
The BlockInfo
for the blockId
block is returned.
Note
-1024
is a special taskAttemptId
(NON_TASK_WRITER) used to mark a non-task thread, e.g. by a driver thread or by unit test code.
For blocks with writerTask
other than NO_WRITER, when blocking
is enabled, lockForReading
waits (until another thread invokes the Object.notify
method or the Object.notifyAll
methods for this object).
With blocking
enabled, it will repeat the waiting-for-read-lock sequence until either None
or the lock is obtained.
When blocking
is disabled and the lock could not be obtained, None
is returned immediately.
Note
lockForReading
is a synchronized
method, i.e. no two objects can use this and other instance methods.
lockForReading
is used when:
BlockInfoManager
is requested to downgradeLock and lockNewBlockForWritingBlockManager
is requested to getLocalValues, getLocalBytes and replicateBlockBlockManagerManagedBuffer
is requested toretain
Obtaining Write Lock for Block¶
lockForWriting(
blockId: BlockId,
blocking: Boolean = true): Option[BlockInfo]
lockForWriting
prints out the following TRACE message to the logs:
Task [currentTaskAttemptId] trying to acquire write lock for [blockId]
lockForWriting
finds the blockId
(in the infos registry). When no BlockInfo could be found, None
is returned. Otherwise, blockId
block is checked for writerTask
to be BlockInfo.NO_WRITER
with no readers (i.e. readerCount
is 0
) and only then the lock is returned.
When the write lock can be returned, BlockInfo.writerTask
is set to currentTaskAttemptId
and a new binding is added to the internal writeLocksByTask registry. lockForWriting
prints out the following TRACE message to the logs:
Task [currentTaskAttemptId] acquired write lock for [blockId]
If, for some reason, BlockInfo.md#writerTask[blockId
has a writer] or the number of readers is positive (i.e. BlockInfo.readerCount
is greater than 0
), the method will wait (based on the input blocking
flag) and attempt the write lock acquisition process until it finishes with a write lock.
NOTE: (deadlock possible) The method is synchronized
and can block, i.e. wait
that causes the current thread to wait until another thread invokes Object.notify
or Object.notifyAll
methods for this object.
lockForWriting
returns None
for no blockId
in the internal infos registry or when blocking
flag is disabled and the write lock could not be acquired.
lockForWriting
is used when:
BlockInfoManager
is requested to lockNewBlockForWritingBlockManager
is requested to removeBlockMemoryStore
is requested to evictBlocksToFreeSpace
Obtaining Write Lock for New Block¶
lockNewBlockForWriting(
blockId: BlockId,
newBlockInfo: BlockInfo): Boolean
lockNewBlockForWriting
obtains a write lock for blockId
but only when the method could register the block.
Note
lockNewBlockForWriting
is similar to lockForWriting method but for brand new blocks.
When executed, lockNewBlockForWriting
prints out the following TRACE message to the logs:
Task [currentTaskAttemptId] trying to put [blockId]
If some other thread has already created the block, lockNewBlockForWriting
finishes returning false
. Otherwise, when the block does not exist, newBlockInfo
is recorded in the infos internal registry and the block is locked for this client for writing. lockNewBlockForWriting
then returns true
.
Note
lockNewBlockForWriting
executes itself in synchronized
block so once the BlockInfoManager
is locked the other internal registries should be available for the current thread only.
lockNewBlockForWriting
is used when:
BlockManager
is requested to doPut
Releasing Lock on Block¶
unlock(
blockId: BlockId,
taskAttemptId: Option[TaskAttemptId] = None): Unit
unlock
prints out the following TRACE message to the logs:
Task [currentTaskAttemptId] releasing lock for [blockId]
unlock
gets the metadata for blockId
(and throws an IllegalStateException
if the block was not found).
If the writer task for the block is not NO_WRITER, it becomes so and the blockId
block is removed from the internal writeLocksByTask registry for the current task attempt.
Otherwise, if the writer task is indeed NO_WRITER
, the block is assumed locked for reading. The readerCount
counter is decremented for the blockId
block and the read lock removed from the internal readLocksByTask registry for the task attempt.
In the end, unlock
wakes up all the threads waiting for the BlockInfoManager
.
unlock
is used when:
BlockInfoManager
is requested to downgradeLockBlockManager
is requested to releaseLock and doPutBlockManagerManagedBuffer
is requested torelease
MemoryStore
is requested to evictBlocksToFreeSpace
Logging¶
Enable ALL
logging level for org.apache.spark.storage.BlockInfoManager
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.storage.BlockInfoManager=ALL
Refer to Logging.