Skip to content

ExternalShuffleBlockResolver

ExternalShuffleBlockResolver manages converting shuffle BlockIds into physical segments of local files (from a process outside of Executors).

Creating Instance

ExternalShuffleBlockResolver takes the following to be created:

ExternalShuffleBlockResolver is created when:

Executors

ExternalShuffleBlockResolver uses a mapping of ExecutorShuffleInfos by AppExecId.

ExternalShuffleBlockResolver can (re)load this mapping from a registeredExecutor file or simply start from scratch.

A new mapping is added when registering an executor.

Directory Cleaner Executor

ExternalShuffleBlockResolver can be given a Java Executor or use a single worker thread executor (with spark-shuffle-directory-cleaner thread prefix).

The Executor is used to schedule a thread to clean up executor's local directories and non-shuffle and non-RDD files in executor's local directories.

spark.shuffle.service.fetch.rdd.enabled

ExternalShuffleBlockResolver uses spark.shuffle.service.fetch.rdd.enabled configuration property to control whether or not to remove cached RDD files (alongside shuffle output files).

Registering Executor

void registerExecutor(
  String appId,
  String execId,
  ExecutorShuffleInfo executorInfo)

registerExecutor...FIXME

registerExecutor is used when:

Cleaning Up Local Directories for Removed Executor

void executorRemoved(
  String executorId,
  String appId)

executorRemoved prints out the following INFO message to the logs:

Clean up non-shuffle and non-RDD files associated with the finished executor [executorId]

executorRemoved looks up the executor in the executors internal registry.

When found, executorRemoved prints out the following INFO message to the logs and requests the Directory Cleaner Executor to execute asynchronous deletion of the executor's local directories (on a separate thread).

Cleaning up non-shuffle and non-RDD files in executor [AppExecId]'s [localDirs] local dirs

When not found, executorRemoved prints out the following INFO message to the logs:

Executor is not registered (appId=[appId], execId=[executorId])

executorRemoved is used when:

deleteNonShuffleServiceServedFiles

void deleteNonShuffleServiceServedFiles(
  String[] dirs)

deleteNonShuffleServiceServedFiles creates a Java FilenameFilter for files that meet all of the following:

  1. A file name does not end with .index or .data
  2. With rddFetchEnabled is enabled, a file name does not start with rdd_ prefix

deleteNonShuffleServiceServedFiles deletes files and directories (based on the FilenameFilter) in every directory (in the input dirs).

deleteNonShuffleServiceServedFiles prints out the following DEBUG message to the logs:

Successfully cleaned up files not served by shuffle service in directory: [localDir]

In case of any exceptions, deleteNonShuffleServiceServedFiles prints out the following ERROR message to the logs:

Failed to delete files not served by shuffle service in directory: [localDir]

Application Removed Notification

void applicationRemoved(
  String appId,
  boolean cleanupLocalDirs)

applicationRemoved...FIXME

applicationRemoved is used when:

deleteExecutorDirs

void deleteExecutorDirs(
  String[] dirs)

deleteExecutorDirs...FIXME

Fetching Block Data

ManagedBuffer getBlockData(
  String appId,
  String execId,
  int shuffleId,
  long mapId,
  int reduceId)

getBlockData...FIXME

getBlockData is used when:

  • ManagedBufferIterator is created
  • ShuffleManagedBufferIterator is requested for next ManagedBuffer

Logging

Enable ALL logging level for org.apache.spark.network.shuffle.ExternalShuffleBlockResolver logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.network.shuffle.ExternalShuffleBlockResolver=ALL

Refer to Logging.