UnifiedMemoryManager¶
UnifiedMemoryManager
is a MemoryManager (with the onHeapExecutionMemory being the Maximum Heap Memory with the onHeapStorageRegionSize taken out).
UnifiedMemoryManager
allows for soft boundaries between storage and execution memory (allowing requests for memory in one region to be fulfilled by borrowing memory from the other).
Creating Instance¶
UnifiedMemoryManager
takes the following to be created:
- SparkConf
- Maximum Heap Memory
- Size of the On-Heap Storage Region
- Number of CPU Cores
While being created, UnifiedMemoryManager
asserts the invariants.
UnifiedMemoryManager
is created using apply factory.
Invariants¶
UnifiedMemoryManager
asserts the following:
-
Sum of the pool size of the on-heap ExecutionMemoryPool and on-heap StorageMemoryPool is exactly the maximum heap memory
-
Sum of the pool size of the off-heap ExecutionMemoryPool and off-heap StorageMemoryPool is exactly the maximum off-heap memory
Total Available On-Heap Memory for Storage¶
maxOnHeapStorageMemory: Long
maxOnHeapStorageMemory
is part of the MemoryManager abstraction.
maxOnHeapStorageMemory
is the difference between Maximum Heap Memory and the memory used in the on-heap execution memory pool.
Size of the On-Heap Storage Memory¶
UnifiedMemoryManager
is given the size of the on-heap storage memory (region) when created.
The size is the fraction (based on spark.memory.storageFraction configuration property) of the maximum heap memory.
The remaining memory space (of the maximum heap memory) is used for the on-heap execution memory.
Creating UnifiedMemoryManager¶
apply(
conf: SparkConf,
numCores: Int): UnifiedMemoryManager
apply
creates a UnifiedMemoryManager with the Maximum Heap Memory and the size of the on-heap storage region as spark.memory.storageFraction of the Maximum Memory.
apply
is used when:
SparkEnv
utility is used to create a base SparkEnv (for the driver and executors)
Maximum Heap Memory¶
UnifiedMemoryManager
is given the maximum heap memory to use (for execution and storage) when created (that uses apply factory method which uses getMaxMemory
).
UnifiedMemoryManager
makes sure that the driver's system memory is at least 1.5
of the Reserved System Memory. Otherwise, getMaxMemory
throws an IllegalArgumentException
:
System memory [systemMemory] must be at least [minSystemMemory].
Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
UnifiedMemoryManager
makes sure that the executor memory (spark.executor.memory) is at least the Reserved System Memory. Otherwise, getMaxMemory
throws an IllegalArgumentException
:
Executor memory [executorMemory] must be at least [minSystemMemory].
Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration.
UnifiedMemoryManager
considers "usable" memory to be the system memory without the reserved memory.
UnifiedMemoryManager
uses the fraction (based on spark.memory.fraction configuration property) of the "usable" memory for the maximum heap memory.
Demo¶
// local mode with --conf spark.driver.memory=2g
scala> sc.getConf.getSizeAsBytes("spark.driver.memory")
res0: Long = 2147483648
scala> val systemMemory = Runtime.getRuntime.maxMemory
// fixed amount of memory for non-storage, non-execution purposes
// UnifiedMemoryManager.RESERVED_SYSTEM_MEMORY_BYTES
val reservedMemory = 300 * 1024 * 1024
// minimum system memory required
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
val usableMemory = systemMemory - reservedMemory
val memoryFraction = sc.getConf.getDouble("spark.memory.fraction", 0.6)
scala> val maxMemory = (usableMemory * memoryFraction).toLong
maxMemory: Long = 956615884
import org.apache.spark.network.util.JavaUtils
scala> JavaUtils.byteStringAsMb(maxMemory + "b")
res1: Long = 912
Reserved System Memory¶
UnifiedMemoryManager
considers 300MB
(300 * 1024 * 1024
bytes) as a reserved system memory while calculating the maximum heap memory.
Acquiring Execution Memory for Task¶
acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long
acquireExecutionMemory
asserts the invariants.
acquireExecutionMemory
selects the execution and storage pools, the storage region size and the maximum memory for the given MemoryMode
.
MemoryMode | ON_HEAP | OFF_HEAP |
---|---|---|
executionPool | onHeapExecutionMemoryPool | offHeapExecutionMemoryPool |
storagePool | onHeapStorageMemoryPool | offHeapStorageMemoryPool |
storageRegionSize | onHeapStorageRegionSize | offHeapStorageMemory |
maxMemory | maxHeapMemory | maxOffHeapMemory |
In the end, acquireExecutionMemory
requests the ExecutionMemoryPool to acquire memory of numBytes
bytes (with the maybeGrowExecutionPool and the maximum size of execution pool functions).
acquireExecutionMemory
is part of the MemoryManager abstraction.
maybeGrowExecutionPool¶
maybeGrowExecutionPool(
extraMemoryNeeded: Long): Unit
maybeGrowExecutionPool
...FIXME
Maximum Size of Execution Pool¶
computeMaxExecutionPoolSize(): Long
computeMaxExecutionPoolSize
takes the minimum size of the storage memory regions (based on the memory mode, ON_HEAP
or OFF_HEAP
, respectively):
- Memory used of the on-heap or the off-heap storage memory pool
- On-heap or the off-heap storage memory size
In the end, computeMaxExecutionPoolSize
returns the size of the remaining memory space of the maximum memory (the maxHeapMemory or the maxOffHeapMemory for ON_HEAP
or OFF_HEAP
memory mode, respectively) without (the minimum size of) the storage memory region.