Skip to content

UnifiedMemoryManager

UnifiedMemoryManager is a MemoryManager (with the onHeapExecutionMemory being the Maximum Heap Memory with the onHeapStorageRegionSize taken out).

UnifiedMemoryManager allows for soft boundaries between storage and execution memory (allowing requests for memory in one region to be fulfilled by borrowing memory from the other).

Creating Instance

UnifiedMemoryManager takes the following to be created:

While being created, UnifiedMemoryManager asserts the invariants.

UnifiedMemoryManager is created using apply factory.

Invariants

UnifiedMemoryManager asserts the following:

Total Available On-Heap Memory for Storage

maxOnHeapStorageMemory: Long

maxOnHeapStorageMemory is part of the MemoryManager abstraction.

maxOnHeapStorageMemory is the difference between Maximum Heap Memory and the memory used in the on-heap execution memory pool.

Size of the On-Heap Storage Memory

UnifiedMemoryManager is given the size of the on-heap storage memory (region) when created.

The size is the fraction (based on spark.memory.storageFraction configuration property) of the maximum heap memory.

The remaining memory space (of the maximum heap memory) is used for the on-heap execution memory.

Creating UnifiedMemoryManager

apply(
  conf: SparkConf,
  numCores: Int): UnifiedMemoryManager

apply creates a UnifiedMemoryManager with the Maximum Heap Memory and the size of the on-heap storage region as spark.memory.storageFraction of the Maximum Memory.

apply is used when:

Maximum Heap Memory

UnifiedMemoryManager is given the maximum heap memory to use (for execution and storage) when created (that uses apply factory method which uses getMaxMemory).

UnifiedMemoryManager makes sure that the driver's system memory is at least 1.5 of the Reserved System Memory. Otherwise, getMaxMemory throws an IllegalArgumentException:

System memory [systemMemory] must be at least [minSystemMemory].
Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.

UnifiedMemoryManager makes sure that the executor memory (spark.executor.memory) is at least the Reserved System Memory. Otherwise, getMaxMemory throws an IllegalArgumentException:

Executor memory [executorMemory] must be at least [minSystemMemory].
Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration.

UnifiedMemoryManager considers "usable" memory to be the system memory without the reserved memory.

UnifiedMemoryManager uses the fraction (based on spark.memory.fraction configuration property) of the "usable" memory for the maximum heap memory.

Demo

// local mode with --conf spark.driver.memory=2g
scala> sc.getConf.getSizeAsBytes("spark.driver.memory")
res0: Long = 2147483648

scala> val systemMemory = Runtime.getRuntime.maxMemory

// fixed amount of memory for non-storage, non-execution purposes
// UnifiedMemoryManager.RESERVED_SYSTEM_MEMORY_BYTES
val reservedMemory = 300 * 1024 * 1024

// minimum system memory required
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong

val usableMemory = systemMemory - reservedMemory

val memoryFraction = sc.getConf.getDouble("spark.memory.fraction", 0.6)
scala> val maxMemory = (usableMemory * memoryFraction).toLong
maxMemory: Long = 956615884

import org.apache.spark.network.util.JavaUtils
scala> JavaUtils.byteStringAsMb(maxMemory + "b")
res1: Long = 912

Reserved System Memory

UnifiedMemoryManager considers 300MB (300 * 1024 * 1024 bytes) as a reserved system memory while calculating the maximum heap memory.

Acquiring Execution Memory for Task

acquireExecutionMemory(
  numBytes: Long,
  taskAttemptId: Long,
  memoryMode: MemoryMode): Long

acquireExecutionMemory asserts the invariants.

acquireExecutionMemory selects the execution and storage pools, the storage region size and the maximum memory for the given MemoryMode.

MemoryMode ON_HEAP OFF_HEAP
executionPool onHeapExecutionMemoryPool offHeapExecutionMemoryPool
storagePool onHeapStorageMemoryPool offHeapStorageMemoryPool
storageRegionSize onHeapStorageRegionSize offHeapStorageMemory
maxMemory maxHeapMemory maxOffHeapMemory

In the end, acquireExecutionMemory requests the ExecutionMemoryPool to acquire memory of numBytes bytes (with the maybeGrowExecutionPool and the maximum size of execution pool functions).


acquireExecutionMemory is part of the MemoryManager abstraction.

maybeGrowExecutionPool

maybeGrowExecutionPool(
  extraMemoryNeeded: Long): Unit

maybeGrowExecutionPool...FIXME

Maximum Size of Execution Pool

computeMaxExecutionPoolSize(): Long

computeMaxExecutionPoolSize takes the minimum size of the storage memory regions (based on the memory mode, ON_HEAP or OFF_HEAP, respectively):

In the end, computeMaxExecutionPoolSize returns the size of the remaining memory space of the maximum memory (the maxHeapMemory or the maxOffHeapMemory for ON_HEAP or OFF_HEAP memory mode, respectively) without (the minimum size of) the storage memory region.