UnifiedMemoryManager is a MemoryManager (with the onHeapExecutionMemory being the Maximum Heap Memory with the onHeapStorageRegionSize taken out).
UnifiedMemoryManager allows for soft boundaries between storage and execution memory (allowing requests for memory in one region to be fulfilled by borrowing memory from the other).
UnifiedMemoryManager takes the following to be created:
- Maximum Heap Memory
- Size of the On-Heap Storage Region
- Number of CPU Cores
While being created,
UnifiedMemoryManager asserts the invariants.
UnifiedMemoryManager is created using apply factory.
UnifiedMemoryManager asserts the following:
Sum of the pool size of the on-heap ExecutionMemoryPool and on-heap StorageMemoryPool is exactly the maximum heap memory
Sum of the pool size of the off-heap ExecutionMemoryPool and off-heap StorageMemoryPool is exactly the maximum off-heap memory
Total Available On-Heap Memory for Storage¶
maxOnHeapStorageMemory is part of the MemoryManager abstraction.
maxOnHeapStorageMemory is the difference between Maximum Heap Memory and the memory used in the on-heap execution memory pool.
Size of the On-Heap Storage Memory¶
UnifiedMemoryManager is given the size of the on-heap storage memory (region) when created.
The size is the fraction (based on spark.memory.storageFraction configuration property) of the maximum heap memory.
The remaining memory space (of the maximum heap memory) is used for the on-heap execution memory.
apply( conf: SparkConf, numCores: Int): UnifiedMemoryManager
apply creates a UnifiedMemoryManager with the Maximum Heap Memory and the size of the on-heap storage region as spark.memory.storageFraction of the Maximum Memory.
apply is used when:
SparkEnvutility is used to create a base SparkEnv (for the driver and executors)
Maximum Heap Memory¶
UnifiedMemoryManager is given the maximum heap memory to use (for execution and storage) when created (that uses apply factory method which uses
UnifiedMemoryManager makes sure that the driver's system memory is at least
1.5 of the Reserved System Memory. Otherwise,
getMaxMemory throws an
System memory [systemMemory] must be at least [minSystemMemory]. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
UnifiedMemoryManager makes sure that the executor memory (spark.executor.memory) is at least the Reserved System Memory. Otherwise,
getMaxMemory throws an
Executor memory [executorMemory] must be at least [minSystemMemory]. Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration.
UnifiedMemoryManager considers "usable" memory to be the system memory without the reserved memory.
UnifiedMemoryManager uses the fraction (based on spark.memory.fraction configuration property) of the "usable" memory for the maximum heap memory.
// local mode with --conf spark.driver.memory=2g scala> sc.getConf.getSizeAsBytes("spark.driver.memory") res0: Long = 2147483648 scala> val systemMemory = Runtime.getRuntime.maxMemory // fixed amount of memory for non-storage, non-execution purposes // UnifiedMemoryManager.RESERVED_SYSTEM_MEMORY_BYTES val reservedMemory = 300 * 1024 * 1024 // minimum system memory required val minSystemMemory = (reservedMemory * 1.5).ceil.toLong val usableMemory = systemMemory - reservedMemory val memoryFraction = sc.getConf.getDouble("spark.memory.fraction", 0.6) scala> val maxMemory = (usableMemory * memoryFraction).toLong maxMemory: Long = 956615884 import org.apache.spark.network.util.JavaUtils scala> JavaUtils.byteStringAsMb(maxMemory + "b") res1: Long = 912
Reserved System Memory¶
300 * 1024 * 1024 bytes) as a reserved system memory while calculating the maximum heap memory.
Acquiring Execution Memory for Task¶
acquireExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long
acquireExecutionMemory asserts the invariants.
acquireExecutionMemory selects the execution and storage pools, the storage region size and the maximum memory for the given
In the end,
acquireExecutionMemory requests the ExecutionMemoryPool to acquire memory of
numBytes bytes (with the maybeGrowExecutionPool and the maximum size of execution pool functions).
acquireExecutionMemory is part of the MemoryManager abstraction.
maybeGrowExecutionPool( extraMemoryNeeded: Long): Unit
Maximum Size of Execution Pool¶
computeMaxExecutionPoolSize takes the minimum size of the storage memory regions (based on the memory mode,
- Memory used of the on-heap or the off-heap storage memory pool
- On-heap or the off-heap storage memory size
In the end,
computeMaxExecutionPoolSize returns the size of the remaining memory space of the maximum memory (the maxHeapMemory or the maxOffHeapMemory for
OFF_HEAP memory mode, respectively) without (the minimum size of) the storage memory region.