MemoryStore

MemoryStore is the memory store for blocks of data for BlockManager.

MemoryStore BlockManager
Figure 1. MemoryStore and BlockManager

The "idiom" to access the current MemoryStore is to request SparkEnv for the BlockManager that manages the MemoryStore.

SparkEnv.get.blockManager.memoryStore
spark MemoryStore
Figure 2. Creating MemoryStore

MemoryStore uses spark.storage.unrollMemoryThreshold configuration property when requested to putIteratorAsValues and putIteratorAsBytes.

Creating Instance

MemoryStore takes the following to be created:

releaseUnrollMemoryForThisTask Method

releaseUnrollMemoryForThisTask(
  memoryMode: MemoryMode,
  memory: Long = Long.MaxValue): Unit

releaseUnrollMemoryForThisTask…​FIXME

releaseUnrollMemoryForThisTask is used when:

  • Task is requested to run (and cleans up after itself)

  • MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes

  • PartiallyUnrolledIterator is requested to releaseUnrollMemory

  • PartiallySerializedBlock is requested to discard and finishWritingToStream

getValues Method

getValues(blockId: BlockId): Option[Iterator[_]]

getValues does…​FIXME

getBytes Method

getBytes(blockId: BlockId): Option[ChunkedByteBuffer]

getBytes does…​FIXME

putIteratorAsBytes Method

putIteratorAsBytes[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long]

putIteratorAsBytes tries to put the blockId block in memory store as bytes.

FIXME

Removing Block

FIXME

Acquiring Storage Memory for Blocks

putBytes[T](
  blockId: BlockId,
  size: Long,
  memoryMode: MemoryMode,
  _bytes: () => ChunkedByteBuffer): Boolean

putBytes requests storage memory for blockId from MemoryManager and registers the block in entries internal registry.

Internally, putBytes first makes sure that blockId block has not been registered already in entries internal registry.

memoryMode can be ON_HEAP or OFF_HEAP and is a property of a StorageLevel.

import org.apache.spark.storage.StorageLevel._
scala> MEMORY_AND_DISK.useOffHeap
res0: Boolean = false

scala> OFF_HEAP.useOffHeap
res1: Boolean = true

If successful, putBytes "materializes" _bytes byte buffer and makes sure that the size is exactly size. It then registers a SerializedMemoryEntry (for the bytes and memoryMode) for blockId in the internal entries registry.

You should see the following INFO message in the logs:

INFO Block [blockId] stored as bytes in memory (estimated size [size], free [bytes])

putBytes returns true only after blockId was successfully registered in the internal entries registry.

Evicting Blocks From Memory

evictBlocksToFreeSpace(
  blockId: Option[BlockId],
  space: Long,
  memoryMode: MemoryMode): Long

evictBlocksToFreeSpace…​FIXME

evictBlocksToFreeSpace is used when StorageMemoryPool is requested to acquireMemory and freeSpaceToShrinkPool.

Checking Whether Block Exists In MemoryStore

contains(blockId: BlockId): Boolean

contains is positive (true) when the entries internal registry contains blockId key.

contains is used when…​FIXME

putIteratorAsValues Method

putIteratorAsValues[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]

putIteratorAsValues makes sure that the BlockId does not exist or throws an IllegalArgumentException:

requirement failed: Block [blockId] is already present in the MemoryStore

putIteratorAsValues reserveUnrollMemoryForThisTask (with the initial memory threshold and ON_HEAP memory mode).

FIXME

putIteratorAsValues tries to put the blockId block in memory store as values.

putIteratorAsValues is used when BlockManager stores bytes of a block or iterator of values of a block or when attempting to cache spilled values read from disk.

reserveUnrollMemoryForThisTask Method

reserveUnrollMemoryForThisTask(
  blockId: BlockId,
  memory: Long,
  memoryMode: MemoryMode): Boolean

reserveUnrollMemoryForThisTask acquires a lock on MemoryManager and requests it to acquireUnrollMemory.

reserveUnrollMemoryForThisTask is used when MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes.

Total Amount Of Memory Available For Storage

maxMemory: Long

maxMemory requests the MemoryManager for the current maxOnHeapStorageMemory and maxOffHeapStorageMemory, and simply returns their sum.

Enable INFO logging to find the maxMemory in the logs when MemoryStore is created:

MemoryStore started with capacity [maxMemory] MB
maxMemory is used for logging purposes only.

Logging

Enable ALL logging level for org.apache.spark.storage.memory.MemoryStore logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.memory.MemoryStore=ALL

Refer to Logging.

Internal Registries

MemoryEntries by BlockId

entries: LinkedHashMap[BlockId, MemoryEntry[_]]

When created, MemoryStore creates a Java LinkedHashMap of MemoryEntries per BlockId (with the initial capacity of 32 and the load factor of 0.75).

entries uses access-order ordering mode where the order of iteration is the order in which the entries were last accessed (from least-recently accessed to most-recently). That gives LRU cache behaviour when evicting blocks.