Skip to content

MemoryStore

MemoryStore manages blocks of data in memory for BlockManager.

MemoryStore and BlockManager

Creating Instance

MemoryStore takes the following to be created:

MemoryStore is created when:

Creating MemoryStore

Blocks

entries: LinkedHashMap[BlockId, MemoryEntry[_]]

MemoryStore creates a LinkedHashMap (Java) of blocks (as MemoryEntries per BlockId) when created.

entries uses access-order ordering mode where the order of iteration is the order in which the entries were last accessed (from least-recently accessed to most-recently). That gives LRU cache behaviour when MemoryStore is requested to evict blocks.

MemoryEntries are added in putBytes and putIterator.

MemoryEntries are removed in remove, clear, and while evicting blocks to free up memory.

DeserializedMemoryEntry

DeserializedMemoryEntry is a MemoryEntry for block values with the following:

  • Array[T] (for the values)
  • size
  • ON_HEAP memory mode

SerializedMemoryEntry

SerializedMemoryEntry is a MemoryEntry for block bytes with the following:

  • ChunkedByteBuffer (for the serialized values)
  • size
  • MemoryMode

spark.storage.unrollMemoryThreshold

MemoryStore uses spark.storage.unrollMemoryThreshold configuration property when requested for the following:

Evicting Blocks

evictBlocksToFreeSpace(
  blockId: Option[BlockId],
  space: Long,
  memoryMode: MemoryMode): Long

evictBlocksToFreeSpace finds blocks to evict in the entries registry (based on least-recently accessed order and until the required space to free up is met or there are no more blocks).

Once done, evictBlocksToFreeSpace returns the memory freed up.

When there is enough blocks to drop to free up memory, evictBlocksToFreeSpace prints out the following INFO message to the logs:

[n] blocks selected for dropping ([freedMemory]) bytes)

evictBlocksToFreeSpace drops the blocks one by one.

evictBlocksToFreeSpace prints out the following INFO message to the logs:

After dropping [n] blocks, free memory is [memory]

When there is not enough blocks to drop to make room for the given block (if any), evictBlocksToFreeSpace prints out the following INFO message to the logs:

Will not store [blockId]

evictBlocksToFreeSpace is used when:

Dropping Block

dropBlock[T](
  blockId: BlockId,
  entry: MemoryEntry[T]): Unit

dropBlock requests the BlockEvictionHandler to drop the block from memory.

If the block is no longer available in any other store, dropBlock requests the BlockInfoManager to remove the block (info).

BlockInfoManager

MemoryStore is given a BlockInfoManager when created.

MemoryStore uses the BlockInfoManager when requested to evict blocks.

Accessing MemoryStore

MemoryStore is available to other Spark services using BlockManager.memoryStore.

import org.apache.spark.SparkEnv
SparkEnv.get.blockManager.memoryStore

Serialized Block Bytes

getBytes(
  blockId: BlockId): Option[ChunkedByteBuffer]

getBytes returns the bytes of the SerializedMemoryEntry of a block (if found in the entries registry).

getBytes is used (for blocks with a serialized and in-memory storage level) when:

Fetching Deserialized Block Values

getValues(
  blockId: BlockId): Option[Iterator[_]]

getValues returns the values of the DeserializedMemoryEntry of the given block (if available in the entries registry).

getValues is used (for blocks with a deserialized and in-memory storage level) when:

putIteratorAsBytes

putIteratorAsBytes[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long]

putIteratorAsBytes requires that the block is not already stored.

putIteratorAsBytes putIterator (with the given BlockId, the values, the MemoryMode and a new SerializedValuesHolder).

If successful, putIteratorAsBytes returns the estimated size of the block. Otherwise, a PartiallySerializedBlock.


putIteratorAsBytes prints out the following WARN message to the logs when the initial memory threshold is too large:

Initial memory threshold of [initialMemoryThreshold] is too large to be set as chunk size.
Chunk size has been capped to "MAX_ROUNDED_ARRAY_LENGTH"

putIteratorAsBytes is used when:

putIteratorAsValues

putIteratorAsValues[T](
  blockId: BlockId,
  values: Iterator[T],
  memoryMode: MemoryMode,
  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]

putIteratorAsValues putIterator (with the given BlockId, the values, the MemoryMode and a new DeserializedValuesHolder).

If successful, putIteratorAsValues returns the estimated size of the block. Otherwise, a PartiallyUnrolledIterator.

putIteratorAsValues is used when:

putIterator

putIterator[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode,
  valuesHolder: ValuesHolder[T]): Either[Long, Long]

putIterator returns the (estimated) size of the block (as Right) or the unrollMemoryUsedByThisBlock (as Left).

putIterator requires that the block is not already in the MemoryStore.

putIterator reserveUnrollMemoryForThisTask (with the spark.storage.unrollMemoryThreshold for the initial memory threshold).

If putIterator did not manage to reserve the memory for unrolling (computing block in memory), it prints out the following WARN message to the logs:

Failed to reserve initial memory threshold of [initialMemoryThreshold]
for computing block [blockId] in memory.

putIterator requests the ValuesHolder to storeValue for every value in the given values iterator. putIterator checks memory usage regularly (whether it may have exceeded the threshold) and reserveUnrollMemoryForThisTask when needed.

putIterator requests the ValuesHolder for a MemoryEntryBuilder (getBuilder) that in turn is requested to build a MemoryEntry.

putIterator releaseUnrollMemoryForThisTask.

putIterator requests the MemoryManager to acquireStorageMemory and stores the block (in the entries registry).

In the end, putIterator prints out the following INFO message to the logs:

Block [blockId] stored as values in memory (estimated size [size], free [free])

In case of putIterator not having enough memory to store the block, putIterator logUnrollFailureMessage and returns the unrollMemoryUsedByThisBlock.


putIterator is used when:

logUnrollFailureMessage

logUnrollFailureMessage(
  blockId: BlockId,
  finalVectorSize: Long): Unit

logUnrollFailureMessage prints out the following WARN message to the logs and logMemoryUsage.

Not enough space to cache [blockId] in memory! (computed [size] so far)

logMemoryUsage

logMemoryUsage(): Unit

logMemoryUsage prints out the following INFO message to the logs (with the blocksMemoryUsed, currentUnrollMemory, numTasksUnrolling, memoryUsed, and maxMemory):

Memory use = [blocksMemoryUsed] (blocks) + [currentUnrollMemory]
(scratch space shared across [numTasksUnrolling] tasks(s)) = [memoryUsed].
Storage limit = [maxMemory].

Storing Block

putBytes[T: ClassTag](
  blockId: BlockId,
  size: Long,
  memoryMode: MemoryMode,
  _bytes: () => ChunkedByteBuffer): Boolean

putBytes returns true only after there was enough memory to store the block (BlockId) in entries registry.


putBytes asserts that the block is not stored yet.

putBytes requests the MemoryManager for memory (to store the block) and, when successful, adds the block to the entries registry (as a SerializedMemoryEntry with the _bytes and the MemoryMode).

In the end, putBytes prints out the following INFO message to the logs:

Block [blockId] stored as bytes in memory (estimated size [size], free [size])

putBytes is used when:

Memory Used for Caching Blocks

blocksMemoryUsed: Long

blocksMemoryUsed is the total memory used without (minus) the memory used for unrolling.

blocksMemoryUsed is used for logging purposes (when MemoryStore is requested to putBytes, putIterator, remove, evictBlocksToFreeSpace and logMemoryUsage).

Total Storage Memory in Use

memoryUsed: Long

memoryUsed requests the MemoryManager for the total storage memory.

memoryUsed is used when:

Maximum Storage Memory

maxMemory: Long

maxMemory is the total amount of memory available for storage (in bytes) and is the sum of the maxOnHeapStorageMemory and maxOffHeapStorageMemory of the MemoryManager.

Tip

Enable INFO logging for MemoryStore to print out the maxMemory to the logs when created:

MemoryStore started with capacity [maxMemory] MB

maxMemory is used when:

Dropping Block from Memory

remove(
  blockId: BlockId): Boolean

remove returns true when the given block (BlockId) was (found and) removed from the entries registry successfully and the memory released (from the MemoryManager).


remove removes (drops) the block (BlockId) from the entries registry.

If found and removed, remove requests the MemoryManager to releaseStorageMemory and prints out the following DEBUG message to the logs (with the maxMemory and blocksMemoryUsed):

Block [blockId] of size [size] dropped from memory (free [memory])

remove is used when:

Releasing Unroll Memory for Task

releaseUnrollMemoryForThisTask(
  memoryMode: MemoryMode,
  memory: Long = Long.MaxValue): Unit

releaseUnrollMemoryForThisTask finds the task attempt ID of the current task.

releaseUnrollMemoryForThisTask uses the onHeapUnrollMemoryMap or offHeapUnrollMemoryMap based on the given MemoryMode.

(Only when the unroll memory map contains the task attempt ID) releaseUnrollMemoryForThisTask descreases the memory registered in the unroll memory map by the given memory amount and requests the MemoryManager to releaseUnrollMemory. In the end, releaseUnrollMemoryForThisTask removes the task attempt ID (entry) from the unroll memory map if the memory used is 0.

releaseUnrollMemoryForThisTask is used when:

  • Task is requested to run (and is about to finish)
  • MemoryStore is requested to putIterator
  • PartiallyUnrolledIterator is requested to releaseUnrollMemory
  • PartiallySerializedBlock is requested to discard and finishWritingToStream

Logging

Enable ALL logging level for org.apache.spark.storage.memory.MemoryStore logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.memory.MemoryStore=ALL

Refer to Logging.