MemoryManager

MemoryManager is an abstraction of memory managers that manage shared memory for task execution (TaskMemoryManager) and block storage (BlockManager).

MemoryManager splits available memory into two regions:

MemoryManager is used to create BlockManager (and MemoryStore) and TaskMemoryManager.

MemoryManager
Figure 1. MemoryManager and Core Services

Contract

Acquiring Execution Memory for Task

acquireExecutionMemory(
  numBytes: Long,
  taskAttemptId: Long,
  memoryMode: MemoryMode): Long

acquireExecutionMemory tries to acquire up to numBytes of execution memory for the current task (by taskAttemptId) and return the number of bytes obtained, or 0 if none can be allocated.

acquireExecutionMemory is used when TaskMemoryManager is requested to acquire execution memory.

Acquiring Storage Memory for Block

acquireStorageMemory(
  blockId: BlockId,
  numBytes: Long,
  memoryMode: MemoryMode): Boolean

acquireStorageMemory tries to acquire numBytes bytes of memory to cache the given block, evicting existing ones if necessary.

acquireStorageMemory is used when:

Acquiring Unroll Memory for Block

acquireUnrollMemory(
  blockId: BlockId,
  numBytes: Long,
  memoryMode: MemoryMode): Boolean

acquireUnrollMemory tries to acquire numBytes bytes of memory to unroll the given block, evicting existing ones if necessary.

acquireUnrollMemory is used when MemoryStore is requested to reserveUnrollMemoryForThisTask.

Total Available Off-Heap Storage Memory

maxOffHeapStorageMemory: Long

maxOffHeapStorageMemory is the total available off-heap memory for storage (in bytes).

maxOffHeapStorageMemory may vary over time.

maxOffHeapStorageMemory is used when:

Total Available On-Heap Storage Memory

maxOnHeapStorageMemory: Long

maxOnHeapStorageMemory is the total available on-heap memory for storage (in bytes).

maxOnHeapStorageMemory may vary over time.

maxOnHeapStorageMemory is used when:

Available MemoryManagers

MemoryManager Description

StaticMemoryManager

Legacy memory manager

UnifiedMemoryManager

Default memory manager

Creating Instance

MemoryManager takes the following to be created:

  • SparkConf

  • Number of CPU cores

  • Size of the on-heap storage memory

  • Size of the on-heap execution memory

MemoryManager is an abstract class and cannot be created directly. It is created indirectly for the concrete MemoryManagers.

MemoryPools for Storage

MemoryManager creates two StorageMemoryPools for on- and off-heap storage (ON_HEAP and OFF_HEAP memory modes, respectively) when created.

MemoryManager immediately requests them to incrementPoolSize as follows:

MemoryManager requests the MemoryPools to use a given MemoryStore when requested to setMemoryStore.

MemoryManager requests the MemoryPools to releaseMemory when requested to releaseStorageMemory.

MemoryManager requests the MemoryPools to releaseAllMemory when requested to releaseAllStorageMemory.

MemoryManager requests the MemoryPools for the memoryUsed when requested for storageMemoryUsed.

Accessing MemoryManager Using SparkEnv

MemoryManager is available as SparkEnv on the driver and executors.

import org.apache.spark.SparkEnv
val mm = SparkEnv.get.memoryManager

scala> :type mm
org.apache.spark.memory.MemoryManager

spark.memory.useLegacyMode Configuration Property

A concrete MemoryManager is chosen based on spark.memory.useLegacyMode configuration property (when SparkEnv is created for the driver and executors).

executionMemoryUsed Method

executionMemoryUsed: Long

executionMemoryUsed…​FIXME

executionMemoryUsed is used when…​FIXME

releaseAllStorageMemory Method

releaseAllStorageMemory(): Unit

releaseAllStorageMemory…​FIXME

releaseAllStorageMemory is used when…​FIXME

releaseUnrollMemory Method

releaseUnrollMemory(
  numBytes: Long,
  memoryMode: MemoryMode): Unit

releaseUnrollMemory…​FIXME

releaseUnrollMemory is used when…​FIXME

Associating MemoryStore with Storage MemoryPools

setMemoryStore(
  store: MemoryStore): Unit

setMemoryStore requests the onHeapStorageMemoryPool and offHeapStorageMemoryPool to use the given MemoryStore.

setMemoryStore is used when BlockManager is created.

releaseExecutionMemory Method

releaseExecutionMemory(
  numBytes: Long,
  taskAttemptId: Long,
  memoryMode: MemoryMode): Unit

releaseExecutionMemory…​FIXME

releaseExecutionMemory is used when TaskMemoryManager is requested to releaseExecutionMemory and cleanUpAllAllocatedMemory

releaseAllExecutionMemoryForTask Method

releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long

releaseAllExecutionMemoryForTask…​FIXME

releaseAllExecutionMemoryForTask is used exclusively when TaskRunner is requested to run (and cleans up after itself).

tungstenMemoryMode Flag

tungstenMemoryMode: MemoryMode

tungstenMemoryMode returns OFF_HEAP only when the following are all met:

  • spark.memory.offHeap.enabled configuration property is enabled (it is not by default)

  • spark.memory.offHeap.size configuration property is greater than 0 (it is 0 by default)

  • JVM supports unaligned memory access (aka unaligned Unsafe, i.e. sun.misc.Unsafe package is available and the underlying system has unaligned-access capability)

Otherwise, tungstenMemoryMode returns ON_HEAP.

Given that spark.memory.offHeap.enabled configuration property is disabled (false) by default and spark.memory.offHeap.size configuration property is 0 by default, Spark seems to encourage using Tungsten memory allocated on the JVM heap (ON_HEAP).
tungstenMemoryMode is a Scala final val and cannot be changed by custom MemoryManagers.

tungstenMemoryMode is used when:

freePage Method

void freePage(MemoryBlock page)

freePage…​FIXME

freePage is used when…​FIXME

storageMemoryUsed Method

storageMemoryUsed: Long

storageMemoryUsed gives the total of the memory used by the on-heap and off-heap StorageMemoryPools.

storageMemoryUsed is used when:

releaseStorageMemory Method

releaseStorageMemory(
  numBytes: Long,
  memoryMode: MemoryMode): Unit

releaseStorageMemory…​FIXME

releaseStorageMemory is used when:

getExecutionMemoryUsageForTask Method

getExecutionMemoryUsageForTask(
  taskAttemptId: Long): Long

getExecutionMemoryUsageForTask…​FIXME

getExecutionMemoryUsageForTask is used when…​FIXME

maxOffHeapMemory

maxOffHeapMemory: Long

maxOffHeapMemory…​FIXME

maxOffHeapMemory is used when…​FIXME

Internal Properties

Name Description

pageSizeBytes

FIXME

tungstenMemoryAllocator

FIXME