TaskMemoryManager manages the memory allocated to execute a single <
TaskMemoryManager is <
TaskRunner is requested to executor:TaskRunner.md#run[run].
TaskMemoryManager assumes that:
- The number of bits to address pages (aka
- The number of bits to encode offsets in data pages (aka
51(i.e. 64 bits -
- The number of entries in the <
> and < > (aka
8192(i.e. 1 <<
- The maximum page size (aka
((1L << 31) - 1) * 8L)
== [[creating-instance]] Creating Instance
TaskMemoryManager takes the following to be created:
- [[taskAttemptId]] executor:TaskRunner.md#taskId[Task attempt ID]
TaskMemoryManager initializes the <
== [[consumers]] Spillable Memory Consumers
TaskMemoryManager tracks memory:MemoryConsumer.md[spillable memory consumers].
TaskMemoryManager registers a new memory consumer when requested to <
TaskMemoryManager removes (clears) all registered memory consumer when requested to <
Memory consumers are used to report memory usage when TaskMemoryManager is requested to <
== [[memoryManager]] MemoryManager
TaskMemoryManager is given a memory:MemoryManager.md[MemoryManager] when <
TaskMemoryManager uses the MemoryManager for the following:
>, < > or < > execution memory
== [[cleanUpAllAllocatedMemory]] Cleaning Up All Allocated Memory
cleanUpAllAllocatedMemory clears <
All recorded <
WARN TaskMemoryManager: leak [bytes] memory from [consumer]
consumers collection is then cleared.
MemoryManager.md#releaseExecutionMemory[MemoryManager.releaseExecutionMemory] is executed to release the memory that is not used by any consumer.
cleanUpAllAllocatedMemory returns, it calls MemoryManager.md#releaseAllExecutionMemoryForTask[MemoryManager.releaseAllExecutionMemoryForTask] that in turn becomes the return value.
CAUTION: FIXME Image with the interactions to
cleanUpAllAllocatedMemory is used exclusively when
TaskRunner is requested to executor:TaskRunner.md#run[run] (and cleans up after itself).
== [[acquireExecutionMemory]] Acquiring Execution Memory
long acquireExecutionMemory( long required, MemoryConsumer consumer)
acquireExecutionMemory allocates up to
required size of memory for the memory:MemoryConsumer.md[MemoryConsumer].
When no memory could be allocated, it calls
spill on every consumer, itself including. Finally,
acquireExecutionMemory returns the allocated memory.
acquireExecutionMemory synchronizes on itself, and so no other calls on the object could be completed.
NOTE: memory:MemoryConsumer.md[MemoryConsumer] knows its mode -- on- or off-heap.
acquireExecutionMemory first calls
memoryManager.acquireExecutionMemory(required, taskAttemptId, mode).
TIP: TaskMemoryManager is a mere wrapper of
MemoryManager to track <
When the memory obtained is less than requested (by
acquireExecutionMemory requests all <
acquireExecutionMemory requests memory from consumers that work in the same mode except the requesting one.
You may see the following DEBUG message when
spill released some memory:
DEBUG Task [taskAttemptId] released [bytes] from [consumer] for [consumer]
memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) again (it called it at the beginning).
It does the memory acquisition until it gets enough memory or there are no more consumers to request
You may also see the following ERROR message in the logs when there is an error while requesting
ERROR error while calling spill() on [consumer]
If the earlier
spill on the consumers did not work out and there is still memory to be acquired,
acquireExecutionMemory MemoryConsumer.md#spill[requests the input
consumer to spill memory to disk] (that in fact requested more memory!)
consumer releases some memory, you should see the following DEBUG message in the logs:
DEBUG Task [taskAttemptId] released [bytes] from itself ([consumer])
memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) once more.
memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) could have been called "three" times, i.e. at the very beginning, for each consumer, and on itself.
It records the
consumer in <
You should see the following DEBUG message in the logs:
DEBUG Task [taskAttemptId] acquired [bytes] for [consumer]
acquireExecutionMemory is used when:
MemoryConsumer is requested to memory:MemoryConsumer.md#acquireMemory[acquire execution memory]
TaskMemoryManager is requested to <
== [[allocatePage]] Allocating Memory Block for Tungsten Consumers
MemoryBlock allocatePage( long size, MemoryConsumer consumer)
NOTE: It only handles Tungsten Consumers, i.e. MemoryConsumer.md[MemoryConsumers] in
allocatePage allocates a block of memory (aka page) smaller than
MAXIMUM_PAGE_SIZE_BYTES maximum size.
size against the internal
MAXIMUM_PAGE_SIZE_BYTES maximum size. If it is greater than the maximum size, the following
IllegalArgumentException is thrown:
Cannot allocate a page with more than [MAXIMUM_PAGE_SIZE_BYTES] bytes
It then <
It finishes by returning
null when no execution memory could be acquired.
With the execution memory acquired, it finds the smallest unallocated page index and records the page number (using <
If the index is
PAGE_TABLE_SIZE or higher, <
IllegalStateException is thrown:
Have already allocated a maximum of [PAGE_TABLE_SIZE] pages
It then attempts to allocate a
Tungsten MemoryAllocator (calling
CAUTION: FIXME What is
MemoryBlock gets assigned
pageNumber and it gets added to the internal <
You should see the following TRACE message in the logs:
TRACE Allocate page number [pageNumber] ([acquired] bytes)
page is returned.
OutOfMemoryError is thrown when allocating a
MemoryBlock page, the following WARN message is printed out to the logs:
WARN Failed to allocate a page ([acquired] bytes), try again.
acquired memory space with the
pageNumber cleared in <
CAUTION: FIXME Why is the code tracking
CAUTION: FIXME Why is there a hope for being able to allocate a page?
void releaseExecutionMemory(long size, MemoryConsumer consumer)¶
releaseExecutionMemory is used when:
MemoryConsumeris requested to MemoryConsumer.md#freeMemory[freeMemory]
* TaskMemoryManager is requested to <
> and < >¶
getMemoryConsumptionForThisTask is used exclusively in Spark tests.
== [[showMemoryUsage]] Displaying Memory Usage
showMemoryUsage prints out the following INFO message to the logs (with the <
Memory used in task [taskAttemptId]¶
showMemoryUsage requests every <
Acquired by [consumer]: [memUsage]¶
showMemoryUsage prints out the following INFO messages to the logs:
[amount] bytes of memory were used by task [taskAttemptId] but are not associated with specific consumers¶
[executionMemoryUsed] bytes of memory are used for execution and [storageMemoryUsed] bytes of memory are used for storage¶
showMemoryUsage is used when MemoryConsumer is requested to memory:MemoryConsumer.md#throwOom[throw an OutOfMemoryError].
pageSizeBytes simply requests the <
pageSizeBytes is used when...FIXME
== [[freePage]] Freeing Memory Page --
void freePage(MemoryBlock page, MemoryConsumer consumer)¶
pageSizeBytes simply requests the <
pageSizeBytes is used when
MemoryConsumer is requested to MemoryConsumer.md#freePage[freePage] and MemoryConsumer.md#throwOom[throwOom].
== [[getPage]] Getting Page --
Object getPage(long pagePlusOffsetAddress)¶
getPage is used when...FIXME
== [[getOffsetInPage]] Getting Page Offset --
long getOffsetInPage(long pagePlusOffsetAddress)¶
getPage is used when...FIXME
== [[logging]] Logging
ALL logging level for
org.apache.spark.memory.TaskMemoryManager logger to see what happens inside.
Add the following line to
Refer to spark-logging.md[Logging].
== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| acquiredButNotUsed | [[acquiredButNotUsed]] The size of memory allocated but not used.
| allocatedPages | [[allocatedPages]] Collection of flags (
false values) of size
PAGE_TABLE_SIZE with all bits initially disabled (i.e.
allocatedPages is https://docs.oracle.com/javase/8/docs/api/java/util/BitSet.html[java.util.BitSet].
| pageTable | [[pageTable]] The array of size
PAGE_TABLE_SIZE with indices being
pageNumber that points to the
MemoryBlock page allocated.
| tungstenMemoryMode | [[tungstenMemoryMode]]
Set to the MemoryManager.md#tungstenMemoryMode[tungstenMemoryMode] of the <