Skip to content

MemoryManager

MemoryManager is an abstraction of memory managers that can share available memory between tasks (TaskMemoryManager) and storage (BlockManager).

MemoryManager and Core Services

MemoryManager splits assigned memory into two regions:

  • Execution Memory for shuffles, joins, sorts and aggregations

  • Storage Memory for caching and propagating internal data across Spark nodes (in on- and off-heap modes)

MemoryManager is used to create BlockManager (and MemoryStore) and TaskMemoryManager.

Contract

Acquiring Execution Memory for Task

acquireExecutionMemory(
  numBytes: Long,
  taskAttemptId: Long,
  memoryMode: MemoryMode): Long

Used when:

Acquiring Storage Memory for Block

acquireStorageMemory(
  blockId: BlockId,
  numBytes: Long,
  memoryMode: MemoryMode): Boolean

Used when:

Acquiring Unroll Memory for Block

acquireUnrollMemory(
  blockId: BlockId,
  numBytes: Long,
  memoryMode: MemoryMode): Boolean

Used when:

Total Available Off-Heap Storage Memory

maxOffHeapStorageMemory: Long

May vary over time

Used when:

Total Available On-Heap Storage Memory

maxOnHeapStorageMemory: Long

May vary over time

Used when:

Implementations

Creating Instance

MemoryManager takes the following to be created:

  • SparkConf
  • Number of CPU Cores
  • Size of the On-Heap Storage Memory
  • Size of the On-Heap Execution Memory
Abstract Class

MemoryManager is an abstract class and cannot be created directly. It is created indirectly for the concrete MemoryManagers.

Accessing MemoryManager

MemoryManager is available as SparkEnv.memoryManager on the driver and executors.

import org.apache.spark.SparkEnv
val mm = SparkEnv.get.memoryManager
// MemoryManager is private[spark]
// the following won't work unless within org.apache.spark package
// import org.apache.spark.memory.MemoryManager
// assert(mm.isInstanceOf[MemoryManager])

// we have to revert to string comparision 😔
assert("UnifiedMemoryManager".equals(mm.getClass.getSimpleName))

Associating MemoryStore with Storage Memory Pools

setMemoryStore(
  store: MemoryStore): Unit

setMemoryStore requests the on-heap and off-heap storage memory pools to use the given MemoryStore.

setMemoryStore is used when:

Execution Memory Pools

On-Heap

onHeapExecutionMemoryPool: ExecutionMemoryPool

MemoryManager creates an ExecutionMemoryPool for ON_HEAP memory mode when created and immediately requests it to incrementPoolSize to onHeapExecutionMemory.

Off-Heap

offHeapExecutionMemoryPool: ExecutionMemoryPool

MemoryManager creates an ExecutionMemoryPool for OFF_HEAP memory mode when created and immediately requests it to incrementPoolSize to...FIXME

Storage Memory Pools

On-Heap

onHeapStorageMemoryPool: StorageMemoryPool

MemoryManager creates a StorageMemoryPool for ON_HEAP memory mode when created and immediately requests it to incrementPoolSize to onHeapExecutionMemory.

onHeapStorageMemoryPool is requested to setMemoryStore when MemoryManager is requested to setMemoryStore.

onHeapStorageMemoryPool is requested to release memory when MemoryManager is requested to release on-heap storage memory.

onHeapStorageMemoryPool is requested to release all memory when MemoryManager is requested to release all storage memory.

onHeapStorageMemoryPool is used when:

Off-Heap

offHeapStorageMemoryPool: StorageMemoryPool

MemoryManager creates a StorageMemoryPool for OFF_HEAP memory mode when created and immediately requested it to incrementPoolSize to offHeapStorageMemory.

MemoryManager requests the MemoryPools to use a given MemoryStore when requested to setMemoryStore.

MemoryManager requests the MemoryPools to release memory when requested to releaseStorageMemory.

MemoryManager requests the MemoryPools to release all memory when requested to release all storage memory.

MemoryManager requests the MemoryPools for the memoryUsed when requested for storageMemoryUsed.

offHeapStorageMemoryPool is used when:

Total Storage Memory Used

storageMemoryUsed: Long

storageMemoryUsed is the sum of the memory used of the on-heap and off-heap storage memory pools.

storageMemoryUsed is used when:

MemoryMode

tungstenMemoryMode: MemoryMode

tungstenMemoryMode tracks whether Tungsten memory will be allocated on the JVM heap or off-heap (using sun.misc.Unsafe).

final val

tungstenMemoryMode is a final value so initialized once when MemoryManager is created.

tungstenMemoryMode is OFF_HEAP when the following are all met:

  • spark.memory.offHeap.enabled configuration property is enabled

  • spark.memory.offHeap.size configuration property is greater than 0

  • JVM supports unaligned memory access (aka unaligned Unsafe, i.e. sun.misc.Unsafe package is available and the underlying system has unaligned-access capability)

Otherwise, tungstenMemoryMode is ON_HEAP.

Note

Given that spark.memory.offHeap.enabled configuration property is turned off by default and spark.memory.offHeap.size configuration property is 0 by default, Apache Spark seems to encourage using Tungsten memory allocated on the JVM heap (ON_HEAP).

tungstenMemoryMode is used when:

MemoryAllocator

tungstenMemoryAllocator: MemoryAllocator

MemoryManager selects the MemoryAllocator to use based on the MemoryMode.

final val

tungstenMemoryAllocator is a final value so initialized once when MemoryManager is created.

MemoryMode MemoryAllocator
ON_HEAP HeapMemoryAllocator
OFF_HEAP UnsafeMemoryAllocator

tungstenMemoryAllocator is used when:

Page Size

pageSizeBytes is either spark.buffer.pageSize, if defined, or the default page size.

pageSizeBytes is used when:

  • TaskMemoryManager is requested for the page size

Default Page Size

defaultPageSizeBytes: Long
Lazy Value

defaultPageSizeBytes is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.