MemoryManager

MemoryManager is an abstraction of memory managers that manage shared memory for task execution (using TaskMemoryManager) and block storage (using BlockManager).

MemoryManager splits available memory into two regions:

  • Execution memory for computations in shuffles, joins, sorts and aggregations

  • Storage memory for caching and propagating internal data across Spark nodes

MemoryManager is used to create BlockManager (and then MemoryStore) and TaskMemoryManager.

MemoryManager
Figure 1. MemoryManager and Core Services

MemoryManager is available as SparkEnv.memoryManager.

import org.apache.spark.SparkEnv
val mm = SparkEnv.get.memoryManager

A concrete MemoryManager is chosen based on spark.memory.useLegacyMode configuration property (for SparkEnv for the driver and executors).

MemoryManagers

MemoryManager Description

StaticMemoryManager

(legacy)

UnifiedMemoryManager

The default memory manager

Creating Instance

MemoryManager takes the following to be created:

  • SparkConf

  • Number of CPU cores

  • Size of the on-heap storage memory

  • Size of the on-heap execution memory

MemoryManager initializes the internal properties.

Acquiring Execution Memory for Task

acquireExecutionMemory(
  numBytes: Long,
  taskAttemptId: Long,
  memoryMode: MemoryMode): Long

acquireExecutionMemory tries to acquire up to numBytes of execution memory for the current task (by taskAttemptId) and return the number of bytes obtained, or 0 if none can be allocated.

acquireExecutionMemory is used when TaskMemoryManager is requested to acquire execution memory.

Acquiring Storage Memory for Block

acquireStorageMemory(
  blockId: BlockId,
  numBytes: Long,
  memoryMode: MemoryMode): Boolean

acquireStorageMemory tries to acquire numBytes bytes of memory to cache the given block, evicting existing ones if necessary.

acquireStorageMemory is used when:

Acquiring Unroll Memory for Block

acquireUnrollMemory(
  blockId: BlockId,
  numBytes: Long,
  memoryMode: MemoryMode): Boolean

acquireUnrollMemory tries to acquire numBytes bytes of memory to unroll the given block, evicting existing ones if necessary.

acquireUnrollMemory is used when MemoryStore is requested to reserveUnrollMemoryForThisTask.

Total Available Off-Heap Storage Memory

maxOffHeapStorageMemory: Long

maxOffHeapStorageMemory is the total available off-heap memory for storage (in bytes).

maxOffHeapStorageMemory may vary over time.

maxOffHeapStorageMemory is used when:

Total Available On-Heap Storage Memory

maxOnHeapStorageMemory: Long

maxOnHeapStorageMemory is the total available on-heap memory for storage (in bytes).

maxOnHeapStorageMemory may vary over time.

maxOnHeapStorageMemory is used when:

releaseExecutionMemory Method

releaseExecutionMemory(
  numBytes: Long,
  taskAttemptId: Long,
  memoryMode: MemoryMode): Unit

releaseExecutionMemory…​FIXME

releaseExecutionMemory is used when TaskMemoryManager is requested to releaseExecutionMemory and cleanUpAllAllocatedMemory

releaseAllExecutionMemoryForTask Method

releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long

releaseAllExecutionMemoryForTask…​FIXME

releaseAllExecutionMemoryForTask is used exclusively when TaskRunner is requested to run (and cleans up after itself).

tungstenMemoryMode Flag

tungstenMemoryMode: MemoryMode

tungstenMemoryMode returns OFF_HEAP only when the following are all met:

  • spark.memory.offHeap.enabled configuration property is enabled (it is not by default)

  • spark.memory.offHeap.size configuration property is greater than 0 (it is 0 by default)

  • JVM supports unaligned memory access (aka unaligned Unsafe, i.e. sun.misc.Unsafe package is available and the underlying system has unaligned-access capability)

Otherwise, tungstenMemoryMode returns ON_HEAP.

Given that spark.memory.offHeap.enabled configuration property is disabled (false) by default and spark.memory.offHeap.size configuration property is 0 by default, Spark seems to encourage using Tungsten memory allocated on the JVM heap (ON_HEAP).
tungstenMemoryMode is a Scala final val and cannot be changed by custom MemoryManagers.

tungstenMemoryMode is used when:

freePage Method

void freePage(MemoryBlock page)

freePage…​FIXME

freePage is used when…​FIXME

storageMemoryUsed Method

storageMemoryUsed: Long

storageMemoryUsed gives the total of the memory used by the on-heap StorageMemoryPool and off-heap StorageMemoryPool.

storageMemoryUsed is used when…​FIXME

Internal Properties

Name Description

onHeapStorageMemoryPool

FIXME

offHeapStorageMemoryPool

FIXME

pageSizeBytes

FIXME

tungstenMemoryAllocator

FIXME