UnifiedMemoryManager¶
UnifiedMemoryManager is a MemoryManager (with the onHeapExecutionMemory being the Maximum Heap Memory with the onHeapStorageRegionSize taken out).
UnifiedMemoryManager allows for soft boundaries between storage and execution memory (allowing requests for memory in one region to be fulfilled by borrowing memory from the other).
Creating Instance¶
UnifiedMemoryManager takes the following to be created:
- SparkConf
- Maximum Heap Memory
- Size of the On-Heap Storage Region
- Number of CPU Cores
While being created, UnifiedMemoryManager asserts the invariants.
UnifiedMemoryManager is created using apply factory.
Invariants¶
UnifiedMemoryManager asserts the following:
-
Sum of the pool size of the on-heap ExecutionMemoryPool and on-heap StorageMemoryPool is exactly the maximum heap memory
-
Sum of the pool size of the off-heap ExecutionMemoryPool and off-heap StorageMemoryPool is exactly the maximum off-heap memory
Total Available On-Heap Memory for Storage¶
maxOnHeapStorageMemory: Long
maxOnHeapStorageMemory is part of the MemoryManager abstraction.
maxOnHeapStorageMemory is the difference between Maximum Heap Memory and the memory used in the on-heap execution memory pool.
Size of the On-Heap Storage Memory¶
UnifiedMemoryManager is given the size of the on-heap storage memory (region) when created.
The size is the fraction (based on spark.memory.storageFraction configuration property) of the maximum heap memory.
The remaining memory space (of the maximum heap memory) is used for the on-heap execution memory.
Creating UnifiedMemoryManager¶
apply(
conf: SparkConf,
numCores: Int): UnifiedMemoryManager
apply creates a UnifiedMemoryManager with the Maximum Heap Memory and the size of the on-heap storage region as spark.memory.storageFraction of the Maximum Memory.
apply is used when:
SparkEnvutility is used to create a base SparkEnv (for the driver and executors)
Maximum Heap Memory¶
UnifiedMemoryManager is given the maximum heap memory to use (for execution and storage) when created (that uses apply factory method which uses getMaxMemory).
UnifiedMemoryManager makes sure that the driver's system memory is at least 1.5 of the Reserved System Memory. Otherwise, getMaxMemory throws an IllegalArgumentException:
System memory [systemMemory] must be at least [minSystemMemory].
Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
UnifiedMemoryManager makes sure that the executor memory (spark.executor.memory) is at least the Reserved System Memory. Otherwise, getMaxMemory throws an IllegalArgumentException:
Executor memory [executorMemory] must be at least [minSystemMemory].
Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration.
UnifiedMemoryManager considers "usable" memory to be the system memory without the reserved memory.
UnifiedMemoryManager uses the fraction (based on spark.memory.fraction configuration property) of the "usable" memory for the maximum heap memory.
Demo¶
// local mode with --conf spark.driver.memory=2g
scala> sc.getConf.getSizeAsBytes("spark.driver.memory")
res0: Long = 2147483648
scala> val systemMemory = Runtime.getRuntime.maxMemory
// fixed amount of memory for non-storage, non-execution purposes
// UnifiedMemoryManager.RESERVED_SYSTEM_MEMORY_BYTES
val reservedMemory = 300 * 1024 * 1024
// minimum system memory required
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
val usableMemory = systemMemory - reservedMemory
val memoryFraction = sc.getConf.getDouble("spark.memory.fraction", 0.6)
scala> val maxMemory = (usableMemory * memoryFraction).toLong
maxMemory: Long = 956615884
import org.apache.spark.network.util.JavaUtils
scala> JavaUtils.byteStringAsMb(maxMemory + "b")
res1: Long = 912
Reserved System Memory¶
UnifiedMemoryManager considers 300MB (300 * 1024 * 1024 bytes) as a reserved system memory while calculating the maximum heap memory.
Acquiring Execution Memory for Task¶
acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long
acquireExecutionMemory asserts the invariants.
acquireExecutionMemory selects the execution and storage pools, the storage region size and the maximum memory for the given MemoryMode.
| MemoryMode | ON_HEAP | OFF_HEAP |
|---|---|---|
| executionPool | onHeapExecutionMemoryPool | offHeapExecutionMemoryPool |
| storagePool | onHeapStorageMemoryPool | offHeapStorageMemoryPool |
| storageRegionSize | onHeapStorageRegionSize | offHeapStorageMemory |
| maxMemory | maxHeapMemory | maxOffHeapMemory |
In the end, acquireExecutionMemory requests the ExecutionMemoryPool to acquire memory of numBytes bytes (with the maybeGrowExecutionPool and the maximum size of execution pool functions).
acquireExecutionMemory is part of the MemoryManager abstraction.
maybeGrowExecutionPool¶
maybeGrowExecutionPool(
extraMemoryNeeded: Long): Unit
maybeGrowExecutionPool...FIXME
Maximum Size of Execution Pool¶
computeMaxExecutionPoolSize(): Long
computeMaxExecutionPoolSize takes the minimum size of the storage memory regions (based on the memory mode, ON_HEAP or OFF_HEAP, respectively):
- Memory used of the on-heap or the off-heap storage memory pool
- On-heap or the off-heap storage memory size
In the end, computeMaxExecutionPoolSize returns the size of the remaining memory space of the maximum memory (the maxHeapMemory or the maxOffHeapMemory for ON_HEAP or OFF_HEAP memory mode, respectively) without (the minimum size of) the storage memory region.