ExternalShuffleBlockResolver¶
ExternalShuffleBlockResolver manages converting shuffle BlockIds into physical segments of local files (from a process outside of Executors).
Creating Instance¶
ExternalShuffleBlockResolver takes the following to be created:
- TransportConf
-
registeredExecutorFile (Java's File) - Directory Cleaner
ExternalShuffleBlockResolver is created when:
ExternalBlockHandleris created
Executors¶
ExternalShuffleBlockResolver uses a mapping of ExecutorShuffleInfos by AppExecId.
ExternalShuffleBlockResolver can (re)load this mapping from a registeredExecutor file or simply start from scratch.
A new mapping is added when registering an executor.
Directory Cleaner Executor¶
ExternalShuffleBlockResolver can be given a Java Executor or use a single worker thread executor (with spark-shuffle-directory-cleaner thread prefix).
The Executor is used to schedule a thread to clean up executor's local directories and non-shuffle and non-RDD files in executor's local directories.
spark.shuffle.service.fetch.rdd.enabled¶
ExternalShuffleBlockResolver uses spark.shuffle.service.fetch.rdd.enabled configuration property to control whether or not to remove cached RDD files (alongside shuffle output files).
Registering Executor¶
void registerExecutor(
String appId,
String execId,
ExecutorShuffleInfo executorInfo)
registerExecutor...FIXME
registerExecutor is used when:
ExternalBlockHandleris requested to handle a RegisterExecutor message and reregisterExecutor
Cleaning Up Local Directories for Removed Executor¶
void executorRemoved(
String executorId,
String appId)
executorRemoved prints out the following INFO message to the logs:
Clean up non-shuffle and non-RDD files associated with the finished executor [executorId]
executorRemoved looks up the executor in the executors internal registry.
When found, executorRemoved prints out the following INFO message to the logs and requests the Directory Cleaner Executor to execute asynchronous deletion of the executor's local directories (on a separate thread).
Cleaning up non-shuffle and non-RDD files in executor [AppExecId]'s [localDirs] local dirs
When not found, executorRemoved prints out the following INFO message to the logs:
Executor is not registered (appId=[appId], execId=[executorId])
executorRemoved is used when:
ExternalBlockHandleris requested to executorRemoved
deleteNonShuffleServiceServedFiles¶
void deleteNonShuffleServiceServedFiles(
String[] dirs)
deleteNonShuffleServiceServedFiles creates a Java FilenameFilter for files that meet all of the following:
- A file name does not end with
.indexor.data - With rddFetchEnabled is enabled, a file name does not start with
rdd_prefix
deleteNonShuffleServiceServedFiles deletes files and directories (based on the FilenameFilter) in every directory (in the input dirs).
deleteNonShuffleServiceServedFiles prints out the following DEBUG message to the logs:
Successfully cleaned up files not served by shuffle service in directory: [localDir]
In case of any exceptions, deleteNonShuffleServiceServedFiles prints out the following ERROR message to the logs:
Failed to delete files not served by shuffle service in directory: [localDir]
Application Removed Notification¶
void applicationRemoved(
String appId,
boolean cleanupLocalDirs)
applicationRemoved...FIXME
applicationRemoved is used when:
ExternalBlockHandleris requested to applicationRemoved
deleteExecutorDirs¶
void deleteExecutorDirs(
String[] dirs)
deleteExecutorDirs...FIXME
Fetching Block Data¶
ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
long mapId,
int reduceId)
getBlockData...FIXME
getBlockData is used when:
ManagedBufferIteratoris createdShuffleManagedBufferIteratoris requested for nextManagedBuffer
Logging¶
Enable ALL logging level for org.apache.spark.network.shuffle.ExternalShuffleBlockResolver logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.network.shuffle.ExternalShuffleBlockResolver=ALL
Refer to Logging.