Skip to content

MemoryStore

MemoryStore manages blocks of data in memory for BlockManager.

MemoryStore and BlockManager

Creating Instance

MemoryStore takes the following to be created:

  • [[conf]] ROOT:SparkConf.md[]
  • <>
  • [[serializerManager]] serializer:SerializerManager.md[]
  • [[memoryManager]] memory:MemoryManager.md[]
  • [[blockEvictionHandler]] storage:BlockEvictionHandler.md[]

MemoryStore is created for storage:BlockManager.md#memoryStore[BlockManager].

.Creating MemoryStore image::spark-MemoryStore.png[align="center"]

== [[blockInfoManager]] BlockInfoManager

MemoryStore is given a storage:BlockInfoManager.md[] when <>.

MemoryStore uses the BlockInfoManager when requested to <>.

== [[memoryStore]] Accessing MemoryStore

MemoryStore is available using storage:BlockManager.md#memoryStore[BlockManager.memoryStore] reference to other Spark services.

[source,scala]

import org.apache.spark.SparkEnv SparkEnv.get.blockManager.memoryStore


== [[unrollMemoryThreshold]][[spark.storage.unrollMemoryThreshold]] spark.storage.unrollMemoryThreshold Configuration Property

MemoryStore uses ROOT:configuration-properties.md#spark.storage.unrollMemoryThreshold[spark.storage.unrollMemoryThreshold] configuration property for <> and <>.

== [[releaseUnrollMemoryForThisTask]] releaseUnrollMemoryForThisTask Method

[source, scala]

releaseUnrollMemoryForThisTask( memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit


releaseUnrollMemoryForThisTask...FIXME

releaseUnrollMemoryForThisTask is used when:

  • Task is requested to scheduler:Task.md#run[run] (and cleans up after itself)

  • MemoryStore is requested to <>

  • PartiallyUnrolledIterator is requested to releaseUnrollMemory

  • PartiallySerializedBlock is requested to discard and finishWritingToStream

== [[getValues]] getValues Method

[source, scala]

getValues( blockId: BlockId): Option[Iterator[_]]


getValues...FIXME

getValues is used when BlockManager is requested to storage:BlockManager.md#doGetLocalBytes[doGetLocalBytes], storage:BlockManager.md#getLocalValues[getLocalValues] and storage:BlockManager.md#maybeCacheDiskBytesInMemory[maybeCacheDiskBytesInMemory].

== [[getBytes]] getBytes Method

[source, scala]

getBytes( blockId: BlockId): Option[ChunkedByteBuffer]


getBytes...FIXME

getBytes is used when BlockManager is requested to storage:BlockManager.md#doGetLocalBytes[doGetLocalBytes], storage:BlockManager.md#getLocalValues[getLocalValues] and storage:BlockManager.md#maybeCacheDiskBytesInMemory[maybeCacheDiskBytesInMemory].

== [[putIteratorAsBytes]] putIteratorAsBytes Method

[source, scala]

putIteratorAsBytesT: Either[PartiallySerializedBlock[T], Long]


putIteratorAsBytes...FIXME

putIteratorAsBytes is used when BlockManager is requested to storage:BlockManager.md#doPutIterator[doPutIterator].

== [[remove]] Dropping Block from Memory

[source, scala]

remove( blockId: BlockId): Boolean


remove removes the given storage:BlockId.md[] from the <> internal registry and branches off based on whether the <> or <>.

=== [[remove-block-removed]] Block Removed

When found and removed, remove requests the <> to memory:MemoryManager.md#releaseStorageMemory[releaseStorageMemory] and prints out the following DEBUG message to the logs:

[source,plaintext]

Block [blockId] of size [size] dropped from memory (free [memory])

remove returns true.

=== [[remove-no-block]] No Block Removed

If no BlockId was registered and removed, remove returns false.

=== [[remove-usage]] Usage

remove is used when BlockManager is requested to storage:BlockManager.md#dropFromMemory[dropFromMemory] and storage:BlockManager.md#removeBlockInternal[removeBlockInternal].

== [[putBytes]] Acquiring Storage Memory for Blocks

[source, scala]

putBytesT: ClassTag: Boolean


putBytes requests memory:MemoryManager.md#acquireStorageMemory[storage memory for blockId from MemoryManager] and registers the block in <> internal registry.

Internally, putBytes first makes sure that blockId block has not been registered already in <> internal registry.

putBytes then requests memory:MemoryManager.md#acquireStorageMemory[size memory for the blockId block in a given memoryMode from the current MemoryManager].

[NOTE]

memoryMode can be ON_HEAP or OFF_HEAP and is a property of a storage:StorageLevel.md[StorageLevel].

import org.apache.spark.storage.StorageLevel._
scala> MEMORY_AND_DISK.useOffHeap
res0: Boolean = false

scala> OFF_HEAP.useOffHeap
res1: Boolean = true

If successful, putBytes "materializes" _bytes byte buffer and makes sure that the size is exactly size. It then registers a SerializedMemoryEntry (for the bytes and memoryMode) for blockId in the internal <> registry.

You should see the following INFO message in the logs:

Block [blockId] stored as bytes in memory (estimated size [size], free [bytes])

putBytes returns true only after blockId was successfully registered in the internal <> registry.

putBytes is used when BlockManager is requested to storage:BlockManager.md#doPutBytes[doPutBytes] and storage:BlockManager.md#maybeCacheDiskBytesInMemory[maybeCacheDiskBytesInMemory].

== [[evictBlocksToFreeSpace]] Evicting Blocks

[source, scala]

evictBlocksToFreeSpace( blockId: Option[BlockId], space: Long, memoryMode: MemoryMode): Long


evictBlocksToFreeSpace...FIXME

evictBlocksToFreeSpace is used when StorageMemoryPool is requested to memory:StorageMemoryPool.md#acquireMemory[acquireMemory] and memory:StorageMemoryPool.md#freeSpaceToShrinkPool[freeSpaceToShrinkPool].

== [[contains]] Checking Whether Block Exists In MemoryStore

[source, scala]

contains( blockId: BlockId): Boolean


contains is positive (true) when the <> internal registry contains blockId key.

contains is used when...FIXME

== [[putIteratorAsValues]] putIteratorAsValues Method

[source, scala]

putIteratorAsValuesT: Either[PartiallyUnrolledIterator[T], Long]


putIteratorAsValues makes sure that the BlockId does not exist or throws an IllegalArgumentException:

requirement failed: Block [blockId] is already present in the MemoryStore

putIteratorAsValues <> (with the <> and ON_HEAP memory mode).

CAUTION: FIXME

putIteratorAsValues tries to put the blockId block in memory store as values.

putIteratorAsValues is used when BlockManager is requested to store storage:BlockManager.md#doPutBytes[bytes] or storage:BlockManager.md#doPutIterator[values] of a block or when storage:BlockManager.md#maybeCacheDiskValuesInMemory[attempting to cache spilled values read from disk].

== [[reserveUnrollMemoryForThisTask]] reserveUnrollMemoryForThisTask Method

[source, scala]

reserveUnrollMemoryForThisTask( blockId: BlockId, memory: Long, memoryMode: MemoryMode): Boolean


reserveUnrollMemoryForThisTask acquires a lock on <> and requests it to memory:MemoryManager.md#acquireUnrollMemory[acquireUnrollMemory].

NOTE: reserveUnrollMemoryForThisTask is used when MemoryStore is requested to <> and <>.

== [[maxMemory]] Total Amount Of Memory Available For Storage

[source, scala]

maxMemory: Long

maxMemory requests the <> for the current memory:MemoryManager.md#maxOnHeapStorageMemory[maxOnHeapStorageMemory] and memory:MemoryManager.md#maxOffHeapStorageMemory[maxOffHeapStorageMemory], and simply returns their sum.

[TIP]

Enable INFO <> to find the maxMemory in the logs when MemoryStore is <>:

MemoryStore started with capacity [maxMemory] MB

NOTE: maxMemory is used for <> purposes only.

== [[putIterator]] putIterator Internal Method

[source, scala]

putIteratorT: Either[Long, Long]


putIterator...FIXME

putIterator is used when MemoryStore is requested to <> and <>.

== [[logUnrollFailureMessage]] logUnrollFailureMessage Internal Method

[source, scala]

logUnrollFailureMessage( blockId: BlockId, finalVectorSize: Long): Unit


logUnrollFailureMessage...FIXME

logUnrollFailureMessage is used when MemoryStore is requested to <>.

== [[logMemoryUsage]] logMemoryUsage Internal Method

[source, scala]

logMemoryUsage(): Unit

logMemoryUsage...FIXME

logMemoryUsage is used when MemoryStore is requested to <>.

== [[memoryUsed]] Total Memory Used

[source, scala]

memoryUsed: Long

memoryUsed requests the <> for the memory:MemoryManager.md#storageMemoryUsed[storageMemoryUsed].

memoryUsed is used when MemoryStore is requested for <> and to <>.

== [[blocksMemoryUsed]] Memory Used for Caching Blocks

[source, scala]

blocksMemoryUsed: Long

blocksMemoryUsed is the <> without the <>.

blocksMemoryUsed is used for logging purposes when MemoryStore is requested to <>, <>, <>, <> and <>.

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.storage.memory.MemoryStore logger to see what happens inside.

Add the following line to conf/log4j.properties:

[source]

log4j.logger.org.apache.spark.storage.memory.MemoryStore=ALL

Refer to ROOT:spark-logging.md[Logging].

== [[internal-registries]] Internal Registries

=== [[entries]] MemoryEntries by BlockId

[source, scala]

entries: LinkedHashMap[BlockId, MemoryEntry[_]]

MemoryStore creates a Java {java-javadoc-url}/java/util/LinkedHashMap.html[LinkedHashMap] of MemoryEntries per storage:BlockId.md[] (with the initial capacity of 32 and the load factor of 0.75) when <>.

entries uses access-order ordering mode where the order of iteration is the order in which the entries were last accessed (from least-recently accessed to most-recently). That gives LRU cache behaviour when MemoryStore is requested to <>.


Last update: 2020-10-14