MemoryStore

MemoryStore is the memory store for blocks of data.

MemoryStore is created exclusively when BlockManager is created.

spark MemoryStore
Figure 1. Creating MemoryStore

The "idiom" to access the current MemoryStore is to request SparkEnv for the BlockManager that manages the MemoryStore.

SparkEnv.get.blockManager.memoryStore

MemoryStore uses Java’s java.util.LinkedHashMap with access-order ordering mode. In access-order, the order of iteration is the order in which the entries were last accessed, from least-recently accessed to most-recently. That gives LRU cache behaviour when evicting blocks.

MemoryStore uses spark.storage.unrollMemoryThreshold configuration property (default: 1024 * 1024 bytes) when requested to putIteratorAsValues and putIteratorAsBytes.

Table 1. MemoryStore’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

entries

Java’s java.util.LinkedHashMap of MemoryEntries per BlockId (with the initial capacity of 32, the load factor of 0.75 and access-order ordering mode, i.e. iteration is in the order in which its entries were last accessed, from least-recently accessed to most-recently).

FIXME Where are these dependencies used?

Enable INFO or DEBUG logging level for org.apache.spark.storage.memory.MemoryStore logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.memory.MemoryStore=DEBUG

Refer to Logging.

releaseUnrollMemoryForThisTask Method

releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit

releaseUnrollMemoryForThisTask…​FIXME

releaseUnrollMemoryForThisTask is used when:

  • Task is requested to run (and cleans up after itself)

  • MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes

  • PartiallyUnrolledIterator is requested to releaseUnrollMemory

  • PartiallySerializedBlock is requested to discard and finishWritingToStream

getValues Method

getValues(blockId: BlockId): Option[Iterator[_]]

getValues does…​FIXME

getBytes Method

getBytes(blockId: BlockId): Option[ChunkedByteBuffer]

getBytes does…​FIXME

putIteratorAsBytes Method

putIteratorAsBytes[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long]

putIteratorAsBytes tries to put the blockId block in memory store as bytes.

FIXME

Removing Block

FIXME

Acquiring Storage Memory for Blocks — putBytes Method

putBytes[T](
  blockId: BlockId,
  size: Long,
  memoryMode: MemoryMode,
  _bytes: () => ChunkedByteBuffer): Boolean

putBytes requests storage memory for blockId from MemoryManager and registers the block in entries internal registry.

Internally, putBytes first makes sure that blockId block has not been registered already in entries internal registry.

memoryMode can be ON_HEAP or OFF_HEAP and is a property of a StorageLevel.

import org.apache.spark.storage.StorageLevel._
scala> MEMORY_AND_DISK.useOffHeap
res0: Boolean = false

scala> OFF_HEAP.useOffHeap
res1: Boolean = true

If successful, putBytes "materializes" _bytes byte buffer and makes sure that the size is exactly size. It then registers a SerializedMemoryEntry (for the bytes and memoryMode) for blockId in the internal entries registry.

You should see the following INFO message in the logs:

INFO Block [blockId] stored as bytes in memory (estimated size [size], free [bytes])

putBytes returns true only after blockId was successfully registered in the internal entries registry.

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.storage.unrollMemoryThreshold

1024 * 1024

Initial per-task memory size needed to store a block in memory.

spark.storage.unrollMemoryThreshold should be at most the total amount of memory available for storage. If not, you should see the following WARN message in the logs:

Max memory [maxMemory] is less than the initial memory threshold [unrollMemoryThreshold] needed to store a block in memory. Please configure Spark with more memory.

Used when MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes.

Evicting Blocks From Memory — evictBlocksToFreeSpace Method

evictBlocksToFreeSpace(
  blockId: Option[BlockId],
  space: Long,
  memoryMode: MemoryMode): Long

evictBlocksToFreeSpace…​FIXME

evictBlocksToFreeSpace is used when StorageMemoryPool is requested to acquireMemory and freeSpaceToShrinkPool.

Checking Whether Block Exists In MemoryStore — contains Method

contains(blockId: BlockId): Boolean

contains is positive (true) when the entries internal registry contains blockId key.

contains is used when…​FIXME

putIteratorAsValues Method

putIteratorAsValues[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]

putIteratorAsValues makes sure that the BlockId does not exist or throws an IllegalArgumentException:

requirement failed: Block [blockId] is already present in the MemoryStore

putIteratorAsValues reserveUnrollMemoryForThisTask (with the initial memory threshold and ON_HEAP memory mode).

FIXME

putIteratorAsValues tries to put the blockId block in memory store as values.

putIteratorAsValues is used when BlockManager stores bytes of a block or iterator of values of a block or when attempting to cache spilled values read from disk.

Creating MemoryStore Instance

MemoryStore takes the following when created:

MemoryStore initializes the internal registries and counters.

reserveUnrollMemoryForThisTask Method

reserveUnrollMemoryForThisTask(
  blockId: BlockId,
  memory: Long,
  memoryMode: MemoryMode): Boolean

reserveUnrollMemoryForThisTask acquires a lock on MemoryManager and requests it to acquireUnrollMemory.

reserveUnrollMemoryForThisTask is used when MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes.

Requesting Total Amount Of Memory Available For Storage (In Bytes) — maxMemory Internal Method

maxMemory: Long

maxMemory requests the MemoryManager for the current maxOnHeapStorageMemory and maxOffHeapStorageMemory, and simply returns their sum.

Enable INFO logging to find the maxMemory in the logs when MemoryStore is created:

MemoryStore started with capacity [maxMemory] MB
maxMemory is used for logging purposes only.