TaskMemoryManager assumes that:
The number of bits to address pages (aka
The number of bits to encode offsets in data pages (aka
51(i.e. 64 bits -
The maximum page size (aka
((1L << 31) - 1) * 8L)
TaskMemoryManager tracks spillable memory consumers.
TaskMemoryManager registers a new memory consumer when requested to acquire execution memory.
TaskMemoryManager removes (clears) all registered memory consumer when requested to clean up all allocated memory.
Memory consumers are used to report memory usage when TaskMemoryManager is requested to show memory usage.
cleanUpAllAllocatedMemory clears page table.
All recorded consumers are queried for the size of used memory. If the memory used is greater than 0, the following WARN message is printed out to the logs:
WARN TaskMemoryManager: leak [bytes] memory from [consumer]
consumers collection is then cleared.
MemoryManager.releaseExecutionMemory is executed to release the memory that is not used by any consumer.
cleanUpAllAllocatedMemory returns, it calls MemoryManager.releaseAllExecutionMemoryForTask that in turn becomes the return value.
FIXME Image with the interactions to
long acquireExecutionMemory( long required, MemoryConsumer consumer)
acquireExecutionMemory allocates up to
required size of memory for the MemoryConsumer.
When no memory could be allocated, it calls
spill on every consumer, itself including. Finally,
acquireExecutionMemory returns the allocated memory.
|MemoryConsumer knows its mode — on- or off-heap.|
acquireExecutionMemory first calls
memoryManager.acquireExecutionMemory(required, taskAttemptId, mode).
TaskMemoryManager is a mere wrapper of
You may see the following DEBUG message when
spill released some memory:
DEBUG Task [taskAttemptId] released [bytes] from [consumer] for [consumer]
memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) again (it called it at the beginning).
It does the memory acquisition until it gets enough memory or there are no more consumers to request
You may also see the following ERROR message in the logs when there is an error while requesting
ERROR error while calling spill() on [consumer]
If the earlier
spill on the consumers did not work out and there is still memory to be acquired,
acquireExecutionMemory requests the input
consumer to spill memory to disk (that in fact requested more memory!)
consumer releases some memory, you should see the following DEBUG message in the logs:
DEBUG Task [taskAttemptId] released [bytes] from itself ([consumer])
memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) once more.
It records the
consumer in consumers registry.
You should see the following DEBUG message in the logs:
DEBUG Task [taskAttemptId] acquired [bytes] for [consumer]
acquireExecutionMemory is used when:
MemoryBlock allocatePage( long size, MemoryConsumer consumer)
It only handles Tungsten Consumers, i.e. MemoryConsumers in
allocatePage allocates a block of memory (aka page) smaller than
MAXIMUM_PAGE_SIZE_BYTES maximum size.
size against the internal
MAXIMUM_PAGE_SIZE_BYTES maximum size. If it is greater than the maximum size, the following
IllegalArgumentException is thrown:
Cannot allocate a page with more than [MAXIMUM_PAGE_SIZE_BYTES] bytes
It then acquires execution memory (for the input
It finishes by returning
null when no execution memory could be acquired.
With the execution memory acquired, it finds the smallest unallocated page index and records the page number (using allocatedPages registry).
If the index is
PAGE_TABLE_SIZE or higher, releaseExecutionMemory(acquired, consumer) is called and then the following
IllegalStateException is thrown:
Have already allocated a maximum of [PAGE_TABLE_SIZE] pages
It then attempts to allocate a
Tungsten MemoryAllocator (calling
FIXME What is
MemoryBlock gets assigned
pageNumber and it gets added to the internal pageTable registry.
You should see the following TRACE message in the logs:
TRACE Allocate page number [pageNumber] ([acquired] bytes)
page is returned.
OutOfMemoryError is thrown when allocating a
MemoryBlock page, the following WARN message is printed out to the logs:
WARN Failed to allocate a page ([acquired] bytes), try again.
acquired memory space with the
pageNumber cleared in allocatedPages (i.e. the index for
FIXME Why is the code tracking
Another allocatePage attempt is recursively tried.
|FIXME Why is there a hope for being able to allocate a page?|
showMemoryUsage prints out the following INFO message to the logs (with the taskAttemptId):
Memory used in task [taskAttemptId]
showMemoryUsage requests every MemoryConsumer to report memory used. showMemoryUsage prints out the following INFO message to the logs for a MemoryConsumer with some memory usage (and excludes zero-memory consumers):
Acquired by [consumer]: [memUsage]
showMemoryUsage prints out the following INFO messages to the logs:
[amount] bytes of memory were used by task [taskAttemptId] but are not associated with specific consumers
[executionMemoryUsed] bytes of memory are used for execution and [storageMemoryUsed] bytes of memory are used for storage
showMemoryUsage is used when MemoryConsumer is requested to throw an OutOfMemoryError.
Object getPage(long pagePlusOffsetAddress)
long getOffsetInPage(long pagePlusOffsetAddress)
ALL logging level for
org.apache.spark.memory.TaskMemoryManager logger to see what happens inside.
Add the following line to
Refer to Logging.
When allocatePage is called, it will record the page in the registry by setting the bit at the specified index (that corresponds to the allocated page) to
When allocating a