Skip to content

DiskBlockManager

DiskBlockManager manages a logical mapping of logical blocks and their physical on-disk locations for a BlockManager.

DiskBlockManager and BlockManager

By default, one block is mapped to one file with a name given by BlockId. It is however possible to have a block to be mapped to a segment of a file only.

Block files are hashed among the local directories.

DiskBlockManager is used to create a DiskStore.

Creating Instance

DiskBlockManager takes the following to be created:

When created, DiskBlockManager creates the local directories for block storage and initializes the internal subDirs collection of locks for every local directory.

DiskBlockManager createLocalDirsForMergedShuffleBlocks.

In the end, DiskBlockManager registers a shutdown hook to clean up the local directories for blocks.

DiskBlockManager is created for BlockManager.

createLocalDirsForMergedShuffleBlocks

createLocalDirsForMergedShuffleBlocks(): Unit

createLocalDirsForMergedShuffleBlocks is a noop with isPushBasedShuffleEnabled disabled (YARN mode only).

createLocalDirsForMergedShuffleBlocks...FIXME

Accessing DiskBlockManager

DiskBlockManager is available using SparkEnv.

org.apache.spark.SparkEnv.get.blockManager.diskBlockManager

Local Directories for Block Storage

DiskBlockManager creates blockmgr directory in every local root directory when created.

DiskBlockManager uses localDirs internal registry of all the blockmgr directories.

DiskBlockManager expects at least one local directory or prints out the following ERROR message to the logs and exits the JVM (with exit code 53):

Failed to create any local dir.

localDirs is used when:

localDirsString

DiskBlockManager uses localDirsString internal registry of the paths of the local blockmgr directories.

localDirsString is used by BlockManager when requested for getLocalDiskDirs.

Creating blockmgr Directory in Every Local Root Directory

createLocalDirs(
  conf: SparkConf): Array[File]

createLocalDirs creates blockmgr local directories for storing block data.


createLocalDirs creates a blockmgr-[randomUUID] directory under every root directory for local storage and prints out the following INFO message to the logs:

Created local directory at [localDir]

In case of an exception, createLocalDirs prints out the following ERROR message to the logs and ignore the directory:

Failed to create local dir in [rootDir]. Ignoring this directory.

File Locks for Local Block Store Directories

subDirs: Array[Array[File]]

subDirs is a lookup table for file locks of every local block directory (with the first dimension for local directories and the second for locks).

The number of block subdirectories is controlled by spark.diskStore.subDirectories configuration property.

subDirs(dirId)(subDirId) is used to access subDirId subdirectory in dirId local directory.

subDirs is used when:

Finding Block File (and Creating Parent Directories)

getFile(
  blockId: BlockId): File
getFile(
  filename: String): File

getFile computes a hash of the file name of the input BlockId that is used for the name of the parent directory and subdirectory.

getFile creates the subdirectory unless it already exists.

getFile is used when:

createTempShuffleBlock

createTempShuffleBlock(): (TempShuffleBlockId, File)

createTempShuffleBlock creates a temporary TempShuffleBlockId block.

createTempShuffleBlock...FIXME

Registering Shutdown Hook

addShutdownHook(): AnyRef

addShutdownHook registers a shutdown hook to execute doStop at shutdown.

When executed, you should see the following DEBUG message in the logs:

Adding shutdown hook

addShutdownHook adds the shutdown hook so it prints the following INFO message and executes doStop:

Shutdown hook called

Getting Writable Directories in YARN

getYarnLocalDirs(
  conf: SparkConf): String

getYarnLocalDirs uses conf SparkConf to read LOCAL_DIRS environment variable with comma-separated local directories (that have already been created and secured so that only the user has access to them).

getYarnLocalDirs throws an Exception when LOCAL_DIRS environment variable was not set:

Yarn Local dirs can't be empty

Checking Whether Spark Runs on YARN

isRunningInYarnContainer(
  conf: SparkConf): Boolean

isRunningInYarnContainer uses conf SparkConf to read Hadoop YARN's CONTAINER_ID environment variable to find out if Spark runs in a YARN container (that is exported by YARN NodeManager).

Getting All Blocks (From Files Stored On Disk)

getAllBlocks(): Seq[BlockId]

getAllBlocks gets all the blocks stored on disk.

Internally, getAllBlocks takes the block files and returns their names (as BlockId).

getAllBlocks is used when BlockManager is requested to find IDs of existing blocks for a given filter.

All Block Files

getAllFiles(): Seq[File]

getAllFiles uses the subDirs registry to list all the files (in all the directories) that are currently stored on disk by this disk manager.

Stopping

stop(): Unit

stop...FIXME

stop is used when BlockManager is requested to stop.

Stopping DiskBlockManager (Removing Local Directories for Blocks)

doStop(): Unit

doStop deletes the local directories recursively (only when the constructor's deleteFilesOnStop is enabled and the parent directories are not registered to be removed at shutdown).

doStop is used when:

Demo

Demo: DiskBlockManager and Block Data

Logging

Enable ALL logging level for org.apache.spark.storage.DiskBlockManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.DiskBlockManager=ALL

Refer to Logging.