MemoryManager¶
MemoryManager
is an abstraction of memory managers that can share available memory between tasks (TaskMemoryManager) and storage (BlockManager).
MemoryManager
splits assigned memory into two regions:
-
Execution Memory for shuffles, joins, sorts and aggregations
-
Storage Memory for caching and propagating internal data across Spark nodes (in on- and off-heap modes)
MemoryManager
is used to create BlockManager (and MemoryStore) and TaskMemoryManager.
Contract¶
Acquiring Execution Memory for Task¶
acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long
Used when:
TaskMemoryManager
is requested to acquire execution memory
Acquiring Storage Memory for Block¶
acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean
Used when:
MemoryStore
is requested for the putBytes and putIterator
Acquiring Unroll Memory for Block¶
acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean
Used when:
MemoryStore
is requested for the reserveUnrollMemoryForThisTask
Total Available Off-Heap Storage Memory¶
maxOffHeapStorageMemory: Long
May vary over time
Used when:
Total Available On-Heap Storage Memory¶
maxOnHeapStorageMemory: Long
May vary over time
Used when:
Implementations¶
Creating Instance¶
MemoryManager
takes the following to be created:
- SparkConf
- Number of CPU Cores
- Size of the On-Heap Storage Memory
- Size of the On-Heap Execution Memory
Abstract Class
MemoryManager
is an abstract class and cannot be created directly. It is created indirectly for the concrete MemoryManagers.
Accessing MemoryManager¶
MemoryManager
is available as SparkEnv.memoryManager on the driver and executors.
import org.apache.spark.SparkEnv
val mm = SparkEnv.get.memoryManager
// MemoryManager is private[spark]
// the following won't work unless within org.apache.spark package
// import org.apache.spark.memory.MemoryManager
// assert(mm.isInstanceOf[MemoryManager])
// we have to revert to string comparision 😔
assert("UnifiedMemoryManager".equals(mm.getClass.getSimpleName))
Associating MemoryStore with Storage Memory Pools¶
setMemoryStore(
store: MemoryStore): Unit
setMemoryStore
requests the on-heap and off-heap storage memory pools to use the given MemoryStore.
setMemoryStore
is used when:
BlockManager
is created
Execution Memory Pools¶
On-Heap¶
onHeapExecutionMemoryPool: ExecutionMemoryPool
MemoryManager
creates an ExecutionMemoryPool for ON_HEAP
memory mode when created and immediately requests it to incrementPoolSize to onHeapExecutionMemory.
Off-Heap¶
offHeapExecutionMemoryPool: ExecutionMemoryPool
MemoryManager
creates an ExecutionMemoryPool for OFF_HEAP
memory mode when created and immediately requests it to incrementPoolSize to...FIXME
Storage Memory Pools¶
On-Heap¶
onHeapStorageMemoryPool: StorageMemoryPool
MemoryManager
creates a StorageMemoryPool for ON_HEAP
memory mode when created and immediately requests it to incrementPoolSize to onHeapExecutionMemory.
onHeapStorageMemoryPool
is requested to setMemoryStore when MemoryManager
is requested to setMemoryStore.
onHeapStorageMemoryPool
is requested to release memory when MemoryManager
is requested to release on-heap storage memory.
onHeapStorageMemoryPool
is requested to release all memory when MemoryManager
is requested to release all storage memory.
onHeapStorageMemoryPool
is used when:
MemoryManager
is requested for the storageMemoryUsed and onHeapStorageMemoryUsedUnifiedMemoryManager
is requested to acquire on-heap execution and storage memory
Off-Heap¶
offHeapStorageMemoryPool: StorageMemoryPool
MemoryManager
creates a StorageMemoryPool for OFF_HEAP
memory mode when created and immediately requested it to incrementPoolSize to offHeapStorageMemory.
MemoryManager
requests the MemoryPool
s to use a given MemoryStore when requested to setMemoryStore.
MemoryManager
requests the MemoryPool
s to release memory when requested to releaseStorageMemory.
MemoryManager
requests the MemoryPool
s to release all memory when requested to release all storage memory.
MemoryManager
requests the MemoryPool
s for the memoryUsed when requested for storageMemoryUsed.
offHeapStorageMemoryPool
is used when:
MemoryManager
is requested for the offHeapStorageMemoryUsedUnifiedMemoryManager
is requested to acquire off-heap execution and storage memory
Total Storage Memory Used¶
storageMemoryUsed: Long
storageMemoryUsed
is the sum of the memory used of the on-heap and off-heap storage memory pools.
storageMemoryUsed
is used when:
TaskMemoryManager
is requested to showMemoryUsageMemoryStore
is requested to memoryUsed
MemoryMode¶
tungstenMemoryMode: MemoryMode
tungstenMemoryMode
tracks whether Tungsten memory will be allocated on the JVM heap or off-heap (using sun.misc.Unsafe
).
final val
tungstenMemoryMode
is a final val
ue so initialized once when MemoryManager
is created.
tungstenMemoryMode
is OFF_HEAP
when the following are all met:
-
spark.memory.offHeap.enabled configuration property is enabled
-
spark.memory.offHeap.size configuration property is greater than
0
-
JVM supports unaligned memory access (aka unaligned Unsafe, i.e.
sun.misc.Unsafe
package is available and the underlying system has unaligned-access capability)
Otherwise, tungstenMemoryMode
is ON_HEAP
.
Note
Given that spark.memory.offHeap.enabled configuration property is turned off by default and spark.memory.offHeap.size configuration property is 0
by default, Apache Spark seems to encourage using Tungsten memory allocated on the JVM heap (ON_HEAP
).
tungstenMemoryMode
is used when:
MemoryManager
is created (and initializes the pageSizeBytes and tungstenMemoryAllocator internal properties)TaskMemoryManager
is created
MemoryAllocator¶
tungstenMemoryAllocator: MemoryAllocator
MemoryManager
selects the MemoryAllocator to use based on the MemoryMode.
final val
tungstenMemoryAllocator
is a final val
ue so initialized once when MemoryManager
is created.
MemoryMode | MemoryAllocator |
---|---|
ON_HEAP | HeapMemoryAllocator |
OFF_HEAP | UnsafeMemoryAllocator |
tungstenMemoryAllocator
is used when:
TaskMemoryManager
is requested to allocate a memory page, release a memory page and clean up all the allocated memory
Page Size¶
pageSizeBytes
is either spark.buffer.pageSize, if defined, or the default page size.
pageSizeBytes
is used when:
TaskMemoryManager
is requested for the page size
Default Page Size¶
defaultPageSizeBytes: Long
Lazy Value
defaultPageSizeBytes
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.