Skip to content


MemoryStore manages blocks of data in memory for BlockManager.

MemoryStore and BlockManager

Creating Instance

MemoryStore takes the following to be created:

  • [[conf]][]
  • <>
  • [[serializerManager]][]
  • [[memoryManager]][]
  • [[blockEvictionHandler]][]

MemoryStore is created for[BlockManager].

.Creating MemoryStore image::spark-MemoryStore.png[align="center"]

== [[blockInfoManager]] BlockInfoManager

MemoryStore is given a[] when <>.

MemoryStore uses the BlockInfoManager when requested to <>.

== [[memoryStore]] Accessing MemoryStore

MemoryStore is available using[BlockManager.memoryStore] reference to other Spark services.


import org.apache.spark.SparkEnv SparkEnv.get.blockManager.memoryStore

== [[unrollMemoryThreshold]][[]] Configuration Property

MemoryStore uses[] configuration property for <> and <>.

== [[releaseUnrollMemoryForThisTask]] releaseUnrollMemoryForThisTask Method

[source, scala]

releaseUnrollMemoryForThisTask( memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit


releaseUnrollMemoryForThisTask is used when:

  • Task is requested to[run] (and cleans up after itself)

  • MemoryStore is requested to <>

  • PartiallyUnrolledIterator is requested to releaseUnrollMemory

  • PartiallySerializedBlock is requested to discard and finishWritingToStream

== [[getValues]] getValues Method

[source, scala]

getValues( blockId: BlockId): Option[Iterator[_]]


getValues is used when BlockManager is requested to[doGetLocalBytes],[getLocalValues] and[maybeCacheDiskBytesInMemory].

== [[getBytes]] getBytes Method

[source, scala]

getBytes( blockId: BlockId): Option[ChunkedByteBuffer]


getBytes is used when BlockManager is requested to[doGetLocalBytes],[getLocalValues] and[maybeCacheDiskBytesInMemory].

== [[putIteratorAsBytes]] putIteratorAsBytes Method

[source, scala]

putIteratorAsBytesT: Either[PartiallySerializedBlock[T], Long]


putIteratorAsBytes is used when BlockManager is requested to[doPutIterator].

== [[remove]] Dropping Block from Memory

[source, scala]

remove( blockId: BlockId): Boolean

remove removes the given[] from the <> internal registry and branches off based on whether the <> or <>.

=== [[remove-block-removed]] Block Removed

When found and removed, remove requests the <> to[releaseStorageMemory] and prints out the following DEBUG message to the logs:


Block [blockId] of size [size] dropped from memory (free [memory])

remove returns true.

=== [[remove-no-block]] No Block Removed

If no BlockId was registered and removed, remove returns false.

=== [[remove-usage]] Usage

remove is used when BlockManager is requested to[dropFromMemory] and[removeBlockInternal].

== [[putBytes]] Acquiring Storage Memory for Blocks

[source, scala]

putBytesT: ClassTag: Boolean

putBytes requests[storage memory for blockId from MemoryManager] and registers the block in <> internal registry.

Internally, putBytes first makes sure that blockId block has not been registered already in <> internal registry.

putBytes then requests[size memory for the blockId block in a given memoryMode from the current MemoryManager].


memoryMode can be ON_HEAP or OFF_HEAP and is a property of a[StorageLevel].

scala> MEMORY_AND_DISK.useOffHeap
res0: Boolean = false

scala> OFF_HEAP.useOffHeap
res1: Boolean = true

If successful, putBytes "materializes" _bytes byte buffer and makes sure that the size is exactly size. It then registers a SerializedMemoryEntry (for the bytes and memoryMode) for blockId in the internal <> registry.

You should see the following INFO message in the logs:

Block [blockId] stored as bytes in memory (estimated size [size], free [bytes])

putBytes returns true only after blockId was successfully registered in the internal <> registry.

putBytes is used when BlockManager is requested to[doPutBytes] and[maybeCacheDiskBytesInMemory].

== [[evictBlocksToFreeSpace]] Evicting Blocks

[source, scala]

evictBlocksToFreeSpace( blockId: Option[BlockId], space: Long, memoryMode: MemoryMode): Long


evictBlocksToFreeSpace is used when StorageMemoryPool is requested to[acquireMemory] and[freeSpaceToShrinkPool].

== [[contains]] Checking Whether Block Exists In MemoryStore

[source, scala]

contains( blockId: BlockId): Boolean

contains is positive (true) when the <> internal registry contains blockId key.

contains is used when...FIXME

== [[putIteratorAsValues]] putIteratorAsValues Method

[source, scala]

putIteratorAsValuesT: Either[PartiallyUnrolledIterator[T], Long]

putIteratorAsValues makes sure that the BlockId does not exist or throws an IllegalArgumentException:

requirement failed: Block [blockId] is already present in the MemoryStore

putIteratorAsValues <> (with the <> and ON_HEAP memory mode).


putIteratorAsValues tries to put the blockId block in memory store as values.

putIteratorAsValues is used when BlockManager is requested to store[bytes] or[values] of a block or when[attempting to cache spilled values read from disk].

== [[reserveUnrollMemoryForThisTask]] reserveUnrollMemoryForThisTask Method

[source, scala]

reserveUnrollMemoryForThisTask( blockId: BlockId, memory: Long, memoryMode: MemoryMode): Boolean

reserveUnrollMemoryForThisTask acquires a lock on <> and requests it to[acquireUnrollMemory].

NOTE: reserveUnrollMemoryForThisTask is used when MemoryStore is requested to <> and <>.

== [[maxMemory]] Total Amount Of Memory Available For Storage

[source, scala]

maxMemory: Long

maxMemory requests the <> for the current[maxOnHeapStorageMemory] and[maxOffHeapStorageMemory], and simply returns their sum.


Enable INFO <> to find the maxMemory in the logs when MemoryStore is <>:

MemoryStore started with capacity [maxMemory] MB

NOTE: maxMemory is used for <> purposes only.

== [[putIterator]] putIterator Internal Method

[source, scala]

putIteratorT: Either[Long, Long]


putIterator is used when MemoryStore is requested to <> and <>.

== [[logUnrollFailureMessage]] logUnrollFailureMessage Internal Method

[source, scala]

logUnrollFailureMessage( blockId: BlockId, finalVectorSize: Long): Unit


logUnrollFailureMessage is used when MemoryStore is requested to <>.

== [[logMemoryUsage]] logMemoryUsage Internal Method

[source, scala]

logMemoryUsage(): Unit


logMemoryUsage is used when MemoryStore is requested to <>.

== [[memoryUsed]] Total Memory Used

[source, scala]

memoryUsed: Long

memoryUsed requests the <> for the[storageMemoryUsed].

memoryUsed is used when MemoryStore is requested for <> and to <>.

== [[blocksMemoryUsed]] Memory Used for Caching Blocks

[source, scala]

blocksMemoryUsed: Long

blocksMemoryUsed is the <> without the <>.

blocksMemoryUsed is used for logging purposes when MemoryStore is requested to <>, <>, <>, <> and <>.

== [[logging]] Logging

Enable ALL logging level for logger to see what happens inside.

Add the following line to conf/


Refer to[Logging].

== [[internal-registries]] Internal Registries

=== [[entries]] MemoryEntries by BlockId

[source, scala]

entries: LinkedHashMap[BlockId, MemoryEntry[_]]

MemoryStore creates a Java {java-javadoc-url}/java/util/LinkedHashMap.html[LinkedHashMap] of MemoryEntries per[] (with the initial capacity of 32 and the load factor of 0.75) when <>.

entries uses access-order ordering mode where the order of iteration is the order in which the entries were last accessed (from least-recently accessed to most-recently). That gives LRU cache behaviour when MemoryStore is requested to <>.

Last update: 2020-10-14