BlockManager

BlockManager manages the storage for blocks (chunks of data) that can be stored in memory and on disk.

BlockManager
Figure 1. BlockManager and Stores

BlockManager runs on the driver and executors.

BlockManager provides interface for uploading and fetching blocks both locally and remotely using various stores, i.e. memory, disk, and off-heap.

BlockManager uses a Scala ExecutionContextExecutorService to execute FIXME asynchronously (on a thread pool with block-manager-future prefix and maximum of 128 threads).

Cached blocks are blocks with non-zero sum of memory and disk sizes.

Use Web UI, esp. Storage and Executors tabs, to monitor the memory used.
Use spark-submit's command-line options, i.e. --driver-memory for the driver and --executor-memory for executors or their equivalents as Spark properties, i.e. spark.executor.memory and spark.driver.memory, to control the memory for storage memory.

When External Shuffle Service is enabled, BlockManager uses ExternalShuffleClient to read other executors' shuffle files.

Creating Instance

BlockManager takes the following to be created:

When created, BlockManager sets externalShuffleServiceEnabled internal flag based on spark.shuffle.service.enabled configuration property.

BlockManager then creates an instance of DiskBlockManager (requesting deleteFilesOnStop when an external shuffle service is not in use).

BlockManager creates block-manager-future daemon cached thread pool with 128 threads maximum (as futureExecutionContext).

BlockManager calculates the maximum memory to use (as maxMemory) by requesting the maximum on-heap and off-heap storage memory from the assigned MemoryManager.

BlockManager calculates the port used by the external shuffle service (as externalShuffleServicePort).

It is computed specially in Spark on YARN.

BlockManager creates a client to read other executors' shuffle files (as shuffleClient). If the external shuffle service is used an ExternalShuffleClient is created or the input BlockTransferService is used.

BlockManager sets the maximum number of failures before this block manager refreshes the block locations from the driver (as maxFailuresBeforeLocationRefresh).

BlockManager registers a BlockManagerSlaveEndpoint with the input RpcEnv, itself, and MapOutputTracker (as slaveEndpoint).

BlockManager is created when SparkEnv is created (for the driver and executors) when a Spark application starts.

BlockManager SparkEnv
Figure 2. BlockManager and SparkEnv

BlockEvictionHandler

BlockManager is a BlockEvictionHandler that can drop a block from memory (and store it on a disk when needed).

ShuffleClient and External Shuffle Service

BlockManager manages the lifecycle of a ShuffleClient:

The ShuffleClient can be an ExternalShuffleClient or the given BlockTransferService based on spark.shuffle.service.enabled configuration property. When enabled, BlockManager uses the ExternalShuffleClient.

The ShuffleClient is available to other Spark services (using shuffleClient value) and is used when BlockStoreShuffleReader is requested to read combined key-value records for a reduce task.

When requested for shuffle metrics, BlockManager simply requests them from the ShuffleClient.

BlockManager and RpcEnv

BlockManager is given a RpcEnv when created.

The RpcEnv is used to set up a BlockManagerSlaveEndpoint.

BlockInfoManager

BlockManager creates a BlockInfoManager when created.

BlockManager requests the BlockInfoManager to clear when requested to stop.

BlockManager uses the BlockInfoManager to create a MemoryStore.

BlockManager uses the BlockInfoManager when requested for the following:

BlockManager and BlockManagerMaster

BlockManager is given a BlockManagerMaster when created.

BlockManager as BlockDataManager

BlockManager is a BlockDataManager.

BlockManager and MapOutputTracker

BlockManager is given a MapOutputTracker when created.

Executor ID

BlockManager is given an Executor ID when created.

The Executor ID is one of the following:

BlockManagerEndpoint RPC Endpoint

BlockManager requests the RpcEnv to register a BlockManagerSlaveEndpoint under the name BlockManagerEndpoint[ID].

The RPC endpoint is used when BlockManager is requested to initialize and reregister (to register the BlockManager on an executor with the BlockManagerMaster on the driver).

The endpoint is stopped (by requesting the RpcEnv to stop the reference) when BlockManager is requested to stop.

Accessing BlockManager Using SparkEnv

BlockManager is available using SparkEnv on the driver and executors.

import org.apache.spark.SparkEnv
val bm = SparkEnv.get.blockManager

scala> :type bm
org.apache.spark.storage.BlockManager

BlockTransferService

BlockManager is given a BlockTransferService when created.

BlockTransferService is used as the ShuffleClient when BlockManager is configured with no external shuffle service (based on spark.shuffle.service.enabled configuration property).

BlockTransferService is initialized when BlockManager is.

BlockTransferService is closed when BlockManager is requested to stop.

BlockTransferService is used when BlockManager is requested to fetching a block from or replicate a block to remote block managers.

MemoryManager

BlockManager is given a MemoryManager when created.

BlockManager uses the MemoryManager for the following:

ShuffleManager

BlockManager is given a ShuffleManager when created.

BlockManager uses the ShuffleManager for the following:

DiskBlockManager

BlockManager creates a DiskBlockManager when created.

DiskBlockManager BlockManager
Figure 3. DiskBlockManager and BlockManager

BlockManager uses the BlockManager for the following:

The BlockManager is available as diskBlockManager reference to other Spark systems.

import org.apache.spark.SparkEnv
SparkEnv.get.blockManager.diskBlockManager

MemoryStore

BlockManager creates a MemoryStore when created (with the BlockInfoManager, the SerializerManager, the MemoryManager and itself as a BlockEvictionHandler).

MemoryStore BlockManager
Figure 4. MemoryStore and BlockManager

BlockManager requests the MemoryManager to use the MemoryStore.

BlockManager uses the MemoryStore for the following:

The MemoryStore is requested to clear when BlockManager is requested to stop.

The MemoryStore is available as memoryStore private reference to other Spark services.

import org.apache.spark.SparkEnv
SparkEnv.get.blockManager.memoryStore

The MemoryStore is used (via SparkEnv.get.blockManager.memoryStore reference) when Task is requested to run (that has finished and requests the MemoryStore to releaseUnrollMemoryForThisTask).

DiskStore

BlockManager creates a DiskStore (with the DiskBlockManager) when created.

DiskStore BlockManager
Figure 5. DiskStore and BlockManager

Performance Metrics

BlockManager uses BlockManagerSource to report metrics under the name BlockManager.

getPeers Internal Method

getPeers(
  forceFetch: Boolean): Seq[BlockManagerId]

getPeers…​FIXME

getPeers is used when BlockManager is requested to replicateBlock and replicate.

Releasing All Locks For Task

releaseAllLocksForTask(
  taskAttemptId: Long): Seq[BlockId]

releaseAllLocksForTask…​FIXME

releaseAllLocksForTask is used when TaskRunner is requested to run (at the end of a task).

Stopping BlockManager

stop(): Unit

stop…​FIXME

stop is used when SparkEnv is requested to stop.

Getting IDs of Existing Blocks (For a Given Filter)

getMatchingBlockIds(
  filter: BlockId => Boolean): Seq[BlockId]

getMatchingBlockIds…​FIXME

getMatchingBlockIds is used when BlockManagerSlaveEndpoint is requested to handle a GetMatchingBlockIds message.

Getting Local Block

getLocalValues(
  blockId: BlockId): Option[BlockResult]

getLocalValues prints out the following DEBUG message to the logs:

Getting local block [blockId]

When no blockId block was found, you should see the following DEBUG message in the logs and getLocalValues returns "nothing" (i.e. NONE).

Block [blockId] was not found

When the blockId block was found, you should see the following DEBUG message in the logs:

Level for block [blockId] is [level]

If blockId block has memory level and is registered in MemoryStore, getLocalValues returns a BlockResult as Memory read method and with a CompletionIterator for an interator:

  1. Values iterator from MemoryStore for blockId for "deserialized" persistence levels.

  2. Iterator from SerializerManager after the data stream has been deserialized for the blockId block and the bytes for blockId block for "serialized" persistence levels.

getLocalValues is used when:

maybeCacheDiskValuesInMemory Internal Method

maybeCacheDiskValuesInMemory[T](
  blockInfo: BlockInfo,
  blockId: BlockId,
  level: StorageLevel,
  diskIterator: Iterator[T]): Iterator[T]

maybeCacheDiskValuesInMemory…​FIXME

maybeCacheDiskValuesInMemory is used when BlockManager is requested to getLocalValues.

getRemoteValues Internal Method

getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult]

getRemoteValues…​FIXME

Retrieving Block from Local or Remote Block Managers

get[T: ClassTag](blockId: BlockId): Option[BlockResult]

get attempts to get the blockId block from a local block manager first before requesting it from remote block managers.

Internally, get tries to get the block from the local BlockManager. If the block was found, you should see the following INFO message in the logs and get returns the local BlockResult.

INFO Found block [blockId] locally

If however the block was not found locally, get tries to get the block from remote block managers. If retrieved from a remote block manager, you should see the following INFO message in the logs and get returns the remote BlockResult.

INFO Found block [blockId] remotely

In the end, get returns "nothing" (i.e. NONE) when the blockId block was not found either in the local BlockManager or any remote BlockManager.

get is used when:

Retrieving Block Data

getBlockData(
  blockId: BlockId): ManagedBuffer
getBlockData is part of the BlockDataManager contract.

For a BlockId of a shuffle (a ShuffleBlockId), getBlockData requests the ShuffleManager for the ShuffleBlockResolver that is then requested for getBlockData.

Otherwise, getBlockData getLocalBytes for the given BlockId.

If found, getBlockData creates a new BlockManagerManagedBuffer (with the BlockInfoManager, the input BlockId, the retrieved BlockData and the dispose flag enabled).

If not found, getBlockData informs the BlockManagerMaster that the block could not be found (and that the master should no longer assume the block is available on this executor) and throws a BlockNotFoundException.

getBlockData is executed for shuffle blocks or local blocks that the BlockManagerMaster knows this executor really has (unless BlockManagerMaster is outdated).

Retrieving Non-Shuffle Local Block Data

getLocalBytes(
  blockId: BlockId): Option[BlockData]

getLocalBytes…​FIXME

getLocalBytes is used when:

  • TorrentBroadcast is requested to readBlocks

  • BlockManager is requested for the block data (of a non-shuffle block)

removeBlockInternal Internal Method

removeBlockInternal(
  blockId: BlockId,
  tellMaster: Boolean): Unit

removeBlockInternal…​FIXME

removeBlockInternal is used when BlockManager is requested to doPut and removeBlock.

Stores

A Store is the place where blocks are held.

There are the following possible stores:

  • MemoryStore for memory storage level.

  • DiskStore for disk storage level.

  • ExternalBlockStore for OFF_HEAP storage level.

Storing Block Data Locally

putBlockData(
  blockId: BlockId,
  data: ManagedBuffer,
  level: StorageLevel,
  classTag: ClassTag[_]): Boolean

putBlockData simply stores blockId locally (given the given storage level).

putBlockData is part of the BlockDataManager Contract.

Internally, putBlockData wraps ChunkedByteBuffer around data buffer’s NIO ByteBuffer and calls putBytes.

Storing Block Bytes Locally

putBytes(
  blockId: BlockId,
  bytes: ChunkedByteBuffer,
  level: StorageLevel,
  tellMaster: Boolean = true): Boolean

putBytes makes sure that the bytes are not null and doPutBytes.

putBytes is used when:

doPutBytes Internal Method

doPutBytes[T](
  blockId: BlockId,
  bytes: ChunkedByteBuffer,
  level: StorageLevel,
  classTag: ClassTag[T],
  tellMaster: Boolean = true,
  keepReadLock: Boolean = false): Boolean

doPutBytes calls the internal helper doPut with a function that accepts a BlockInfo and does the uploading.

Inside the function, if the storage level's replication is greater than 1, it immediately starts replication of the blockId block on a separate thread (from futureExecutionContext thread pool). The replication uses the input bytes and level storage level.

For a memory storage level, the function checks whether the storage level is deserialized or not. For a deserialized storage level, BlockManager's SerializerManager deserializes bytes into an iterator of values that MemoryStore stores. If however the storage level is not deserialized, the function requests MemoryStore to store the bytes

If the put did not succeed and the storage level is to use disk, you should see the following WARN message in the logs:

WARN BlockManager: Persisting block [blockId] to disk instead.
DiskStore is requested to store the bytes of a block with memory and disk storage level only when MemoryStore has failed.

If the storage level is to use disk only, DiskStore stores the bytes.

doPutBytes requests current block status and if the block was successfully stored, and the driver should know about it (tellMaster), the function reports the current storage status of the block to the driver. The current TaskContext metrics are updated with the updated block status (only when executed inside a task where TaskContext is available).

You should see the following DEBUG message in the logs:

DEBUG BlockManager: Put block [blockId] locally took [time] ms

The function waits till the earlier asynchronous replication finishes for a block with replication level greater than 1.

The final result of doPutBytes is the result of storing the block successful or not (as computed earlier).

doPutBytes is used exclusively when BlockManager is requested to putBytes.

doPut Internal Method

doPut[T](
  blockId: BlockId,
  level: StorageLevel,
  classTag: ClassTag[_],
  tellMaster: Boolean,
  keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T]

doPut executes the input putBody function with a BlockInfo being a new BlockInfo object (with level storage level) that BlockInfoManager managed to create a write lock for.

If the block has already been created (and BlockInfoManager did not manage to create a write lock for), the following WARN message is printed out to the logs:

Block [blockId] already exists on this machine; not re-adding it

doPut releases the read lock for the block when keepReadLock flag is disabled and returns None immediately.

If however the write lock has been given, doPut executes putBody.

If the result of putBody is None the block is considered saved successfully.

For successful save and keepReadLock disabled, BlockInfoManager is requested to release lock on blockId.

For unsuccessful save, the block is removed from memory and disk stores and the following WARN message is printed out to the logs:

Putting block [blockId] failed

In the end, doPut prints out the following DEBUG message to the logs:

Putting block [blockId] [withOrWithout] replication took [usedTime] ms

doPut is used when BlockManager is requested to doPutBytes and doPutIterator.

Removing Block From Memory and Disk

removeBlock(
  blockId: BlockId,
  tellMaster: Boolean = true): Unit

removeBlock removes the blockId block from the MemoryStore and DiskStore.

When executed, it prints out the following DEBUG message to the logs:

Removing block [blockId]

It requests BlockInfoManager for lock for writing for the blockId block. If it receives none, it prints out the following WARN message to the logs and quits.

Asked to remove block [blockId], which does not exist

Otherwise, with a write lock for the block, the block is removed from MemoryStore and DiskStore (see Removing Block in MemoryStore and Removing Block in DiskStore).

If both removals fail, it prints out the following WARN message:

Block [blockId] could not be removed as it was not found in either the disk, memory, or external block store

The block is removed from BlockInfoManager.

removeBlock then calculates the current block status that is used to report the block status to the driver (if the input tellMaster and the info’s tellMaster are both enabled, i.e. true) and the current TaskContext metrics are updated with the change.

removeBlock is used when:

Removing RDD Blocks

removeRdd(rddId: Int): Int

removeRdd removes all the blocks that belong to the rddId RDD.

It prints out the following INFO message to the logs:

INFO Removing RDD [rddId]

It then requests RDD blocks from BlockInfoManager and removes them (from memory and disk) (without informing the driver).

The number of blocks removed is the final result.

Removing All Blocks of Broadcast Variable

removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int

removeBroadcast removes all the blocks of the input broadcastId broadcast.

Internally, it starts by printing out the following DEBUG message to the logs:

Removing broadcast [broadcastId]

It then requests all the BroadcastBlockId objects that belong to the broadcastId broadcast from BlockInfoManager and removes them (from memory and disk).

The number of blocks removed is the final result.

BlockManagerId of Shuffle Server

BlockManager uses BlockManagerId for the location (address) of the server that serves shuffle files of this executor.

The BlockManagerId is either the BlockManagerId of the external shuffle service (when enabled) or the blockManagerId.

The BlockManagerId of the Shuffle Server is used for the location of a shuffle map output when:

getStatus Method

getStatus(
  blockId: BlockId): Option[BlockStatus]

getStatus…​FIXME

getStatus is used when BlockManagerSlaveEndpoint is requested to handle GetBlockStatus message.

Initializing BlockManager

initialize(
  appId: String): Unit

initialize initializes a BlockManager on the driver and executors (see Creating SparkContext Instance and Creating Executor Instance, respectively).

The method must be called before a BlockManager can be considered fully operable.

initialize does the following in order:

  1. Initializes BlockTransferService

  2. Initializes the internal shuffle client, be it ExternalShuffleClient or BlockTransferService.

  3. Registers itself with the driver’s BlockManagerMaster (using the id, maxMemory and its slaveEndpoint).

    The BlockManagerMaster reference is passed in when the BlockManager is created on the driver and executors.

  4. Sets shuffleServerId to an instance of BlockManagerId given an executor id, host name and port for BlockTransferService.

  5. It creates the address of the server that serves this executor’s shuffle files (using shuffleServerId)

FIXME Review the initialize procedure again
FIXME Describe shuffleServerId. Where is it used?

If the External Shuffle Service is used, initialize prints out the following INFO message to the logs:

external shuffle service port = [externalShuffleServicePort]

It registers itself to the driver’s BlockManagerMaster passing the BlockManagerId, the maximum memory (as maxMemory), and the BlockManagerSlaveEndpoint.

Ultimately, if the initialization happens on an executor and the External Shuffle Service is used, it registers to the shuffle service.

Registering Executor’s BlockManager with External Shuffle Server

registerWithExternalShuffleServer(): Unit

registerWithExternalShuffleServer is an internal helper method to register the BlockManager for an executor with an external shuffle server.

When executed, you should see the following INFO message in the logs:

Registering executor with local external shuffle service.

It uses shuffleClient to register the block manager using shuffleServerId (i.e. the host, the port and the executorId) and a ExecutorShuffleInfo.

The ExecutorShuffleInfo uses localDirs and subDirsPerLocalDir from DiskBlockManager and the class name of the constructor ShuffleManager.

It tries to register at most 3 times with 5-second sleeps in-between.

The maximum number of attempts and the sleep time in-between are hard-coded, i.e. they are not configured.

Any issues while connecting to the external shuffle service are reported as ERROR messages in the logs:

Failed to connect to external shuffle server, will retry [#attempts] more times after waiting 5 seconds...

registerWithExternalShuffleServer is used when BlockManager is requested to initialize (when executed on an executor with externalShuffleServiceEnabled).

Re-registering BlockManager with Driver and Reporting Blocks

reregister(): Unit

When executed, reregister prints the following INFO message to the logs:

BlockManager [blockManagerId] re-registering with master

reregister then registers itself to the driver’s BlockManagerMaster (just as it was when BlockManager was initializing). It passes the BlockManagerId, the maximum memory (as maxMemory), and the BlockManagerSlaveEndpoint.

reregister will then report all the local blocks to the BlockManagerMaster.

You should see the following INFO message in the logs:

Reporting [blockInfoManager.size] blocks to the master.

If there is an issue communicating to the BlockManagerMaster, you should see the following ERROR message in the logs:

Failed to report [blockId] to master; giving up.

After the ERROR message, reregister stops reporting.

Calculate Current Block Status

getCurrentBlockStatus(
  blockId: BlockId,
  info: BlockInfo): BlockStatus

getCurrentBlockStatus gives the current BlockStatus of the BlockId block (with the block’s current StorageLevel, memory and disk sizes). It uses MemoryStore and DiskStore for size and other information.

Most of the information to build BlockStatus is already in BlockInfo except that it may not necessarily reflect the current state per MemoryStore and DiskStore.

Internally, it uses the input BlockInfo to know about the block’s storage level. If the storage level is not set (i.e. null), the returned BlockStatus assumes the default NONE storage level and the memory and disk sizes being 0.

If however the storage level is set, getCurrentBlockStatus uses MemoryStore and DiskStore to check whether the block is stored in the storages or not and request for their sizes in the storages respectively (using their getSize or assume 0).

It is acceptable that the BlockInfo says to use memory or disk yet the block is not in the storages (yet or anymore). The method will give current status.

reportAllBlocks Internal Method

reportAllBlocks(): Unit

reportAllBlocks…​FIXME

reportAllBlocks is used when BlockManager is requested to re-register all blocks to the driver.

Reporting Current Storage Status of Block to Driver

reportBlockStatus(
  blockId: BlockId,
  info: BlockInfo,
  status: BlockStatus,
  droppedMemorySize: Long = 0L): Unit

reportBlockStatus is an internal method for reporting a block status to the driver and if told to re-register it prints out the following INFO message to the logs:

Got told to re-register updating block [blockId]

It does asynchronous reregistration (using asyncReregister).

In either case, it prints out the following DEBUG message to the logs:

Told master about block [blockId]

reportBlockStatus is used when BlockManager is requested to getBlockData, doPutBytes, doPutIterator, dropFromMemory and removeBlockInternal.

Reporting Block Status Update to Driver

def tryToReportBlockStatus(
  blockId: BlockId,
  info: BlockInfo,
  status: BlockStatus,
  droppedMemorySize: Long = 0L): Boolean

tryToReportBlockStatus reports block status update to BlockManagerMaster and returns its response.

tryToReportBlockStatus is used when BlockManager is requested to reportAllBlocks or reportBlockStatus.

Execution Context

block-manager-future is the execution context for…​FIXME

ByteBuffer

The underlying abstraction for blocks in Spark is a ByteBuffer that limits the size of a block to 2GB (Integer.MAX_VALUE - see Why does FileChannel.map take up to Integer.MAX_VALUE of data? and SPARK-1476 2GB limit in spark for blocks). This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2GB, even though the API allows for long), ser-deser via byte array-backed output streams.

BlockResult

BlockResult is a description of a fetched block with the readMethod and bytes.

Registering Task

registerTask(
  taskAttemptId: Long): Unit

registerTask requests the BlockInfoManager to register a given task.

registerTask is used when Task is requested to run (at the start of a task).

Creating DiskBlockObjectWriter

getDiskWriter(
  blockId: BlockId,
  file: File,
  serializerInstance: SerializerInstance,
  bufferSize: Int,
  writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter

getDiskWriter creates a DiskBlockObjectWriter (with spark.shuffle.sync configuration property for syncWrites argument).

getDiskWriter uses the SerializerManager of the BlockManager.

getDiskWriter is used when:

Recording Updated BlockStatus In Current Task’s TaskMetrics

addUpdatedBlockStatusToTaskMetrics(
  blockId: BlockId,
  status: BlockStatus): Unit

addUpdatedBlockStatusToTaskMetrics takes an active TaskContext (if available) and records updated BlockStatus for Block (in the task’s TaskMetrics).

addUpdatedBlockStatusToTaskMetrics is used when BlockManager doPutBytes (for a block that was successfully stored), doPut, doPutIterator, removes blocks from memory (possibly spilling it to disk) and removes block from memory and disk.

shuffleMetricsSource: Source

shuffleMetricsSource requests the ShuffleClient for the shuffle metrics and creates a ShuffleMetricsSource with the source name based on spark.shuffle.service.enabled configuration property:

shuffleMetricsSource is used when Executor is created (for non-local / cluster modes).

Replicating Block To Peers

replicate(
  blockId: BlockId,
  data: BlockData,
  level: StorageLevel,
  classTag: ClassTag[_],
  existingReplicas: Set[BlockManagerId] = Set.empty): Unit

replicate…​FIXME

replicate is used when BlockManager is requested to doPutBytes, doPutIterator and replicateBlock.

replicateBlock Method

replicateBlock(
  blockId: BlockId,
  existingReplicas: Set[BlockManagerId],
  maxReplicas: Int): Unit

replicateBlock…​FIXME

replicateBlock is used when BlockManagerSlaveEndpoint is requested to handle a ReplicateBlock message.

putIterator Method

putIterator[T: ClassTag](
  blockId: BlockId,
  values: Iterator[T],
  level: StorageLevel,
  tellMaster: Boolean = true): Boolean

putIterator…​FIXME

putIterator is used when:

  • BlockManager is requested to putSingle

  • Spark Streaming’s BlockManagerBasedBlockHandler is requested to storeBlock

putSingle Method

putSingle[T: ClassTag](
  blockId: BlockId,
  value: T,
  level: StorageLevel,
  tellMaster: Boolean = true): Boolean

putSingle…​FIXME

putSingle is used when TorrentBroadcast is requested to write the blocks and readBroadcastBlock.

Fetching Block From Remote Nodes

getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer]

getRemoteBytes…​FIXME

getRemoteBytes is used when:

getRemoteValues Internal Method

getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult]

getRemoteValues…​FIXME

getRemoteValues is used exclusively when BlockManager is requested to get a block by BlockId.

getSingle Method

getSingle[T: ClassTag](blockId: BlockId): Option[T]

getSingle…​FIXME

getSingle is used exclusively in Spark tests.

Getting Block From Block Managers Or Computing and Storing It Otherwise

getOrElseUpdate[T](
  blockId: BlockId,
  level: StorageLevel,
  classTag: ClassTag[T],
  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]]

I think it is fair to say that getOrElseUpdate is like getOrElseUpdate of scala.collection.mutable.Map in Scala.

getOrElseUpdate(key: K, op: ⇒ V): V

Quoting the official scaladoc:

If given key K is already in this map, getOrElseUpdate returns the associated value V.

Otherwise, getOrElseUpdate computes a value V from given expression op, stores with the key K in the map and returns that value.

Since BlockManager is a key-value store of blocks of data identified by a block ID that works just fine.

getOrElseUpdate first attempts to get the block by the BlockId (from the local block manager first and, if unavailable, requesting remote peers).

Enable INFO logging level for org.apache.spark.storage.BlockManager logger to see what happens when BlockManager tries to get a block.

See logging in this document.

getOrElseUpdate gives the BlockResult of the block if found.

If however the block was not found (in any block manager in a Spark cluster), getOrElseUpdate doPutIterator (for the input BlockId, the makeIterator function and the StorageLevel).

getOrElseUpdate branches off per the result.

For None, getOrElseUpdate getLocalValues for the BlockId and eventually returns the BlockResult (unless terminated by a SparkException due to some internal error).

For Some(iter), getOrElseUpdate returns an iterator of T values.

getOrElseUpdate is used exclusively when RDD is requested to get or compute an RDD partition (for a RDDBlockId with a RDD ID and a partition index).

doPutIterator Internal Method

doPutIterator[T](
  blockId: BlockId,
  iterator: () => Iterator[T],
  level: StorageLevel,
  classTag: ClassTag[T],
  tellMaster: Boolean = true,
  keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]]

doPutIterator simply doPut with the putBody function that accepts a BlockInfo and does the following:

  1. putBody branches off per whether the StorageLevel indicates to use a memory or simply a disk, i.e.

    • When the input StorageLevel indicates to use a memory for storage in deserialized format, putBody requests MemoryStore to putIteratorAsValues (for the BlockId and with the iterator factory function).

      If the MemoryStore returned a correct value, the internal size is set to the value.

      If however the MemoryStore failed to give a correct value, FIXME

    • When the input StorageLevel indicates to use memory for storage in serialized format, putBody…​FIXME

    • When the input StorageLevel does not indicate to use memory for storage but disk instead, putBody…​FIXME

  2. putBody requests the current block status

  3. Only when the block was successfully stored in either the memory or disk store:

    • putBody reports the block status to the BlockManagerMaster when the input tellMaster flag (default: enabled) and the tellMaster flag of the block info are both enabled.

    • putBody addUpdatedBlockStatusToTaskMetrics (with the BlockId and BlockStatus)

    • putBody prints out the following DEBUG message to the logs:

      Put block [blockId] locally took [time] ms
    • When the input StorageLevel indicates to use replication, putBody doGetLocalBytes followed by replicate (with the input BlockId and the StorageLevel as well as the BlockData to replicate)

    • With a successful replication, putBody prints out the following DEBUG message to the logs:

      Put block [blockId] remotely took [time] ms
  4. In the end, putBody may or may not give a PartiallyUnrolledIterator if…​FIXME

doPutIterator is used when BlockManager is requested to get a block from block managers or computing and storing it otherwise and putIterator.

Dropping Block from Memory

dropFromMemory(
  blockId: BlockId,
  data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel

dropFromMemory prints out the following INFO message to the logs:

Dropping block [blockId] from memory

dropFromMemory then asserts that the given block is locked for writing.

If the block’s StorageLevel uses disks and the internal DiskStore object (diskStore) does not contain the block, it is saved then. You should see the following INFO message in the logs:

Writing block [blockId] to disk
FIXME Describe the case with saving a block to disk.

The block’s memory size is fetched and recorded (using MemoryStore.getSize).

The block is removed from memory if exists. If not, you should see the following WARN message in the logs:

Block [blockId] could not be dropped from memory as it does not exist

It then calculates the current storage status of the block and reports it to the driver. It only happens when info.tellMaster.

FIXME When would info.tellMaster be true?

A block is considered updated when it was written to disk or removed from memory or both. If either happened, the current TaskContext metrics are updated with the change.

In the end, dropFromMemory returns the current storage level of the block.

dropFromMemory is part of the BlockEvictionHandler abstraction.

handleLocalReadFailure Internal Method

handleLocalReadFailure(blockId: BlockId): Nothing

handleLocalReadFailure…​FIXME

handleLocalReadFailure is used when…​FIXME

releaseLockAndDispose Method

releaseLockAndDispose(
  blockId: BlockId,
  data: BlockData,
  taskAttemptId: Option[Long] = None): Unit

releaseLockAndDispose…​FIXME

releaseLockAndDispose is used when…​FIXME

releaseLock Method

releaseLock(
  blockId: BlockId,
  taskAttemptId: Option[Long] = None): Unit

releaseLock requests the BlockInfoManager to unlock the given block.

releaseLock is part of the BlockDataManager abstraction.

putBlockDataAsStream Method

putBlockDataAsStream(
  blockId: BlockId,
  level: StorageLevel,
  classTag: ClassTag[_]): StreamCallbackWithID

putBlockDataAsStream…​FIXME

putBlockDataAsStream is part of the BlockDataManager abstraction.

downgradeLock Method

downgradeLock(
  blockId: BlockId): Unit

downgradeLock requests the BlockInfoManager to downgradeLock for the given block.

downgradeLock seems not to be used.

blockIdsToLocations Utility

blockIdsToLocations(
  blockIds: Array[BlockId],
  env: SparkEnv,
  blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]]

blockIdsToLocations…​FIXME

blockIdsToLocations is used in the now defunct Spark Streaming (when BlockRDD is requested for _locations).

getLocationBlockIds Internal Method

getLocationBlockIds(
  blockIds: Array[BlockId]): Array[Seq[BlockManagerId]]

getLocationBlockIds…​FIXME

getLocationBlockIds is used when BlockManager utility is requested to blockIdsToLocations (for the now defunct Spark Streaming).

Logging

Enable ALL logging level for org.apache.spark.storage.BlockManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockManager=ALL

Refer to Logging.

Internal Properties

Maximum Memory

Total maximum value that BlockManager can ever possibly use (that depends on MemoryManager and may vary over time).

Total available on-heap and off-heap memory for storage (in bytes)

Maximum Off-Heap Memory

Maximum On-Heap Memory