Skip to content

TaskMemoryManager

TaskMemoryManager manages the memory allocated to execute a single <> (using <>).

TaskMemoryManager is <> when TaskRunner is requested to executor:TaskRunner.md#run[run].

Creating TaskMemoryManager for Task

TaskMemoryManager assumes that:

  • The number of bits to address pages (aka PAGE_NUMBER_BITS) is 13
  • The number of bits to encode offsets in data pages (aka OFFSET_BITS) is 51 (i.e. 64 bits - PAGE_NUMBER_BITS)
  • The number of entries in the <> and <> (aka PAGE_TABLE_SIZE) is 8192 (i.e. 1 << PAGE_NUMBER_BITS)
  • The maximum page size (aka MAXIMUM_PAGE_SIZE_BYTES) is 15GB (i.e. ((1L << 31) - 1) * 8L)

== [[creating-instance]] Creating Instance

TaskMemoryManager takes the following to be created:

  • <>
  • [[taskAttemptId]] executor:TaskRunner.md#taskId[Task attempt ID]

TaskMemoryManager initializes the <>.

== [[consumers]] Spillable Memory Consumers

TaskMemoryManager tracks memory:MemoryConsumer.md[spillable memory consumers].

TaskMemoryManager registers a new memory consumer when requested to <>.

TaskMemoryManager removes (clears) all registered memory consumer when requested to <>.

Memory consumers are used to report memory usage when TaskMemoryManager is requested to <>.

== [[memoryManager]] MemoryManager

TaskMemoryManager is given a memory:MemoryManager.md[MemoryManager] when <>.

TaskMemoryManager uses the MemoryManager for the following:

  • <>, <> or <> execution memory

  • <>

  • <>

  • <>

  • <>

  • <>

== [[cleanUpAllAllocatedMemory]] Cleaning Up All Allocated Memory

[source, java]

long cleanUpAllAllocatedMemory()

cleanUpAllAllocatedMemory clears <>.

CAUTION: FIXME

All recorded <> are queried for the size of used memory. If the memory used is greater than 0, the following WARN message is printed out to the logs:

WARN TaskMemoryManager: leak [bytes] memory from [consumer]

The consumers collection is then cleared.

MemoryManager.md#releaseExecutionMemory[MemoryManager.releaseExecutionMemory] is executed to release the memory that is not used by any consumer.

Before cleanUpAllAllocatedMemory returns, it calls MemoryManager.md#releaseAllExecutionMemoryForTask[MemoryManager.releaseAllExecutionMemoryForTask] that in turn becomes the return value.

CAUTION: FIXME Image with the interactions to MemoryManager.

NOTE: cleanUpAllAllocatedMemory is used exclusively when TaskRunner is requested to executor:TaskRunner.md#run[run] (and cleans up after itself).

== [[acquireExecutionMemory]] Acquiring Execution Memory

[source, java]

long acquireExecutionMemory( long required, MemoryConsumer consumer)


acquireExecutionMemory allocates up to required size of memory for the memory:MemoryConsumer.md[MemoryConsumer].

When no memory could be allocated, it calls spill on every consumer, itself including. Finally, acquireExecutionMemory returns the allocated memory.

NOTE: acquireExecutionMemory synchronizes on itself, and so no other calls on the object could be completed.

NOTE: memory:MemoryConsumer.md[MemoryConsumer] knows its mode -- on- or off-heap.

acquireExecutionMemory first calls memoryManager.acquireExecutionMemory(required, taskAttemptId, mode).

TIP: TaskMemoryManager is a mere wrapper of MemoryManager to track <>?

CAUTION: FIXME

When the memory obtained is less than requested (by required), acquireExecutionMemory requests all <> to MemoryConsumer.md#spill[release memory (by spilling it to disk)].

NOTE: acquireExecutionMemory requests memory from consumers that work in the same mode except the requesting one.

You may see the following DEBUG message when spill released some memory:

DEBUG Task [taskAttemptId] released [bytes] from [consumer] for [consumer]

acquireExecutionMemory calls memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) again (it called it at the beginning).

It does the memory acquisition until it gets enough memory or there are no more consumers to request spill from.

You may also see the following ERROR message in the logs when there is an error while requesting spill with OutOfMemoryError followed.

ERROR error while calling spill() on [consumer]

If the earlier spill on the consumers did not work out and there is still memory to be acquired, acquireExecutionMemory MemoryConsumer.md#spill[requests the input consumer to spill memory to disk] (that in fact requested more memory!)

If the consumer releases some memory, you should see the following DEBUG message in the logs:

DEBUG Task [taskAttemptId] released [bytes] from itself ([consumer])

acquireExecutionMemory calls memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) once more.

NOTE: memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) could have been called "three" times, i.e. at the very beginning, for each consumer, and on itself.

It records the consumer in <> registry.

You should see the following DEBUG message in the logs:

DEBUG Task [taskAttemptId] acquired [bytes] for [consumer]

acquireExecutionMemory is used when:

  • MemoryConsumer is requested to memory:MemoryConsumer.md#acquireMemory[acquire execution memory]

  • TaskMemoryManager is requested to <>

== [[allocatePage]] Allocating Memory Block for Tungsten Consumers

[source, java]

MemoryBlock allocatePage( long size, MemoryConsumer consumer)


NOTE: It only handles Tungsten Consumers, i.e. MemoryConsumer.md[MemoryConsumers] in tungstenMemoryMode mode.

allocatePage allocates a block of memory (aka page) smaller than MAXIMUM_PAGE_SIZE_BYTES maximum size.

It checks size against the internal MAXIMUM_PAGE_SIZE_BYTES maximum size. If it is greater than the maximum size, the following IllegalArgumentException is thrown:

Cannot allocate a page with more than [MAXIMUM_PAGE_SIZE_BYTES] bytes

It then <> (for the input size and consumer).

It finishes by returning null when no execution memory could be acquired.

With the execution memory acquired, it finds the smallest unallocated page index and records the page number (using <> registry).

If the index is PAGE_TABLE_SIZE or higher, <> is called and then the following IllegalStateException is thrown:

Have already allocated a maximum of [PAGE_TABLE_SIZE] pages

It then attempts to allocate a MemoryBlock from Tungsten MemoryAllocator (calling memoryManager.tungstenMemoryAllocator().allocate(acquired)).

CAUTION: FIXME What is MemoryAllocator?

When successful, MemoryBlock gets assigned pageNumber and it gets added to the internal <> registry.

You should see the following TRACE message in the logs:

TRACE Allocate page number [pageNumber] ([acquired] bytes)

The page is returned.

If a OutOfMemoryError is thrown when allocating a MemoryBlock page, the following WARN message is printed out to the logs:

WARN Failed to allocate a page ([acquired] bytes), try again.

And acquiredButNotUsed gets acquired memory space with the pageNumber cleared in <> (i.e. the index for pageNumber gets false).

CAUTION: FIXME Why is the code tracking acquiredButNotUsed?

Another <> attempt is recursively tried.

CAUTION: FIXME Why is there a hope for being able to allocate a page?

== [[releaseExecutionMemory]] releaseExecutionMemory Method

[source, java]

void releaseExecutionMemory(long size, MemoryConsumer consumer)

releaseExecutionMemory...FIXME

[NOTE]

releaseExecutionMemory is used when:

  • MemoryConsumer is requested to MemoryConsumer.md#freeMemory[freeMemory]

* TaskMemoryManager is requested to <> and <>

== [[getMemoryConsumptionForThisTask]] getMemoryConsumptionForThisTask Method

[source, java]

long getMemoryConsumptionForThisTask()

getMemoryConsumptionForThisTask...FIXME

NOTE: getMemoryConsumptionForThisTask is used exclusively in Spark tests.

== [[showMemoryUsage]] Displaying Memory Usage

[source, java]

void showMemoryUsage()

showMemoryUsage prints out the following INFO message to the logs (with the <>):

[source,plaintext]

Memory used in task [taskAttemptId]

showMemoryUsage requests every <> to memory:MemoryConsumer.md#getUsed[report memory used]. showMemoryUsage prints out the following INFO message to the logs for a MemoryConsumer with some memory usage (and excludes zero-memory consumers):

[source,plaintext]

Acquired by [consumer]: [memUsage]

showMemoryUsage prints out the following INFO messages to the logs:

[source,plaintext]

[amount] bytes of memory were used by task [taskAttemptId] but are not associated with specific consumers

[source,plaintext]

[executionMemoryUsed] bytes of memory are used for execution and [storageMemoryUsed] bytes of memory are used for storage

showMemoryUsage is used when MemoryConsumer is requested to memory:MemoryConsumer.md#throwOom[throw an OutOfMemoryError].

== [[pageSizeBytes]] pageSizeBytes Method

[source, java]

long pageSizeBytes()

pageSizeBytes simply requests the <> for MemoryManager.md#pageSizeBytes[pageSizeBytes].

NOTE: pageSizeBytes is used when...FIXME

== [[freePage]] Freeing Memory Page -- freePage Method

[source, java]

void freePage(MemoryBlock page, MemoryConsumer consumer)

pageSizeBytes simply requests the <> for MemoryManager.md#pageSizeBytes[pageSizeBytes].

NOTE: pageSizeBytes is used when MemoryConsumer is requested to MemoryConsumer.md#freePage[freePage] and MemoryConsumer.md#throwOom[throwOom].

== [[getPage]] Getting Page -- getPage Method

[source, java]

Object getPage(long pagePlusOffsetAddress)

getPage...FIXME

NOTE: getPage is used when...FIXME

== [[getOffsetInPage]] Getting Page Offset -- getOffsetInPage Method

[source, java]

long getOffsetInPage(long pagePlusOffsetAddress)

getPage...FIXME

NOTE: getPage is used when...FIXME

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.memory.TaskMemoryManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

[source]

log4j.logger.org.apache.spark.memory.TaskMemoryManager=ALL

Refer to ROOT:spark-logging.md[Logging].

== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| acquiredButNotUsed | [[acquiredButNotUsed]] The size of memory allocated but not used.

| allocatedPages | [[allocatedPages]] Collection of flags (true or false values) of size PAGE_TABLE_SIZE with all bits initially disabled (i.e. false).

TIP: allocatedPages is https://docs.oracle.com/javase/8/docs/api/java/util/BitSet.html[java.util.BitSet].

When <> is called, it will record the page in the registry by setting the bit at the specified index (that corresponds to the allocated page) to true.

| pageTable | [[pageTable]] The array of size PAGE_TABLE_SIZE with indices being MemoryBlock objects.

When <MemoryBlock page for Tungsten consumers>>, the index corresponds to pageNumber that points to the MemoryBlock page allocated.

| tungstenMemoryMode | [[tungstenMemoryMode]] MemoryMode (i.e. OFF_HEAP or ON_HEAP)

Set to the MemoryManager.md#tungstenMemoryMode[tungstenMemoryMode] of the <> while TaskMemoryManager is <> |===


Last update: 2020-10-09