MemoryStore

MemoryStore manages blocks of data in memory for BlockManager.

MemoryStore BlockManager
Figure 1. MemoryStore and BlockManager

Creating Instance

MemoryStore takes the following to be created:

MemoryStore is created for BlockManager.

spark MemoryStore
Figure 2. Creating MemoryStore

BlockInfoManager

MemoryStore is given a BlockInfoManager when created.

MemoryStore uses the BlockInfoManager when requested to evictBlocksToFreeSpace.

Accessing MemoryStore

MemoryStore is available using BlockManager.memoryStore reference to other Spark services.

import org.apache.spark.SparkEnv
SparkEnv.get.blockManager.memoryStore

spark.storage.unrollMemoryThreshold Configuration Property

MemoryStore uses spark.storage.unrollMemoryThreshold configuration property for putIterator and putIteratorAsBytes.

releaseUnrollMemoryForThisTask Method

releaseUnrollMemoryForThisTask(
  memoryMode: MemoryMode,
  memory: Long = Long.MaxValue): Unit

releaseUnrollMemoryForThisTask…​FIXME

releaseUnrollMemoryForThisTask is used when:

  • Task is requested to run (and cleans up after itself)

  • MemoryStore is requested to putIterator

  • PartiallyUnrolledIterator is requested to releaseUnrollMemory

  • PartiallySerializedBlock is requested to discard and finishWritingToStream

getValues Method

getValues(
  blockId: BlockId): Option[Iterator[_]]

getValues…​FIXME

getValues is used when BlockManager is requested to doGetLocalBytes, getLocalValues and maybeCacheDiskBytesInMemory.

getBytes Method

getBytes(
  blockId: BlockId): Option[ChunkedByteBuffer]

getBytes…​FIXME

getBytes is used when BlockManager is requested to doGetLocalBytes, getLocalValues and maybeCacheDiskBytesInMemory.

putIteratorAsBytes Method

putIteratorAsBytes[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long]

putIteratorAsBytes…​FIXME

putIteratorAsBytes is used when BlockManager is requested to doPutIterator.

Dropping Block from Memory

remove(
  blockId: BlockId): Boolean

remove removes the given BlockId from the entries internal registry and branches off based on whether the block was found and removed or not.

Block Removed

When found and removed, remove requests the MemoryManager to releaseStorageMemory and prints out the following DEBUG message to the logs:

Block [blockId] of size [size] dropped from memory (free [memory])

remove returns true.

No Block Removed

If no BlockId was registered and removed, remove returns false.

Usage

remove is used when BlockManager is requested to dropFromMemory and removeBlockInternal.

Acquiring Storage Memory for Blocks

putBytes[T: ClassTag](
  blockId: BlockId,
  size: Long,
  memoryMode: MemoryMode,
  _bytes: () => ChunkedByteBuffer): Boolean

putBytes requests storage memory for blockId from MemoryManager and registers the block in entries internal registry.

Internally, putBytes first makes sure that blockId block has not been registered already in entries internal registry.

memoryMode can be ON_HEAP or OFF_HEAP and is a property of a StorageLevel.

import org.apache.spark.storage.StorageLevel._
scala> MEMORY_AND_DISK.useOffHeap
res0: Boolean = false

scala> OFF_HEAP.useOffHeap
res1: Boolean = true

If successful, putBytes "materializes" _bytes byte buffer and makes sure that the size is exactly size. It then registers a SerializedMemoryEntry (for the bytes and memoryMode) for blockId in the internal entries registry.

You should see the following INFO message in the logs:

Block [blockId] stored as bytes in memory (estimated size [size], free [bytes])

putBytes returns true only after blockId was successfully registered in the internal entries registry.

putBytes is used when BlockManager is requested to doPutBytes and maybeCacheDiskBytesInMemory.

Evicting Blocks

evictBlocksToFreeSpace(
  blockId: Option[BlockId],
  space: Long,
  memoryMode: MemoryMode): Long

evictBlocksToFreeSpace…​FIXME

evictBlocksToFreeSpace is used when StorageMemoryPool is requested to acquireMemory and freeSpaceToShrinkPool.

Checking Whether Block Exists In MemoryStore

contains(
  blockId: BlockId): Boolean

contains is positive (true) when the entries internal registry contains blockId key.

contains is used when…​FIXME

putIteratorAsValues Method

putIteratorAsValues[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]

putIteratorAsValues makes sure that the BlockId does not exist or throws an IllegalArgumentException:

requirement failed: Block [blockId] is already present in the MemoryStore

putIteratorAsValues reserveUnrollMemoryForThisTask (with the initial memory threshold and ON_HEAP memory mode).

FIXME

putIteratorAsValues tries to put the blockId block in memory store as values.

putIteratorAsValues is used when BlockManager is requested to store bytes or values of a block or when attempting to cache spilled values read from disk.

reserveUnrollMemoryForThisTask Method

reserveUnrollMemoryForThisTask(
  blockId: BlockId,
  memory: Long,
  memoryMode: MemoryMode): Boolean

reserveUnrollMemoryForThisTask acquires a lock on MemoryManager and requests it to acquireUnrollMemory.

reserveUnrollMemoryForThisTask is used when MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes.

Total Amount Of Memory Available For Storage

maxMemory: Long

maxMemory requests the MemoryManager for the current maxOnHeapStorageMemory and maxOffHeapStorageMemory, and simply returns their sum.

Enable INFO logging to find the maxMemory in the logs when MemoryStore is created:

MemoryStore started with capacity [maxMemory] MB
maxMemory is used for logging purposes only.

putIterator Internal Method

putIterator[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode,
  valuesHolder: ValuesHolder[T]): Either[Long, Long]

putIterator…​FIXME

putIterator is used when MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes.

logUnrollFailureMessage Internal Method

logUnrollFailureMessage(
  blockId: BlockId,
  finalVectorSize: Long): Unit

logUnrollFailureMessage…​FIXME

logUnrollFailureMessage is used when MemoryStore is requested to putIterator.

logMemoryUsage Internal Method

logMemoryUsage(): Unit

logMemoryUsage…​FIXME

logMemoryUsage is used when MemoryStore is requested to logUnrollFailureMessage.

Total Memory Used

memoryUsed: Long

memoryUsed requests the MemoryManager for the storageMemoryUsed.

memoryUsed is used when MemoryStore is requested for blocksMemoryUsed and to logMemoryUsage.

Memory Used for Caching Blocks

blocksMemoryUsed: Long

blocksMemoryUsed is the total memory used without the current memory used for unrolling.

blocksMemoryUsed is used for logging purposes when MemoryStore is requested to putBytes, putIterator, remove, evictBlocksToFreeSpace and logMemoryUsage.

Logging

Enable ALL logging level for org.apache.spark.storage.memory.MemoryStore logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.memory.MemoryStore=ALL

Refer to Logging.

Internal Registries

MemoryEntries by BlockId

entries: LinkedHashMap[BlockId, MemoryEntry[_]]

MemoryStore creates a Java LinkedHashMap of MemoryEntries per BlockId (with the initial capacity of 32 and the load factor of 0.75) when Creating Instance.

entries uses access-order ordering mode where the order of iteration is the order in which the entries were last accessed (from least-recently accessed to most-recently). That gives LRU cache behaviour when MemoryStore is requested to evict blocks.