IndexShuffleBlockResolver is a ShuffleBlockResolver that manages shuffle block data and uses shuffle index files for faster shuffle data access.
writeIndexFileAndCommit( shuffleId: Int, mapId: Int, lengths: Array[Long], dataTmp: File): Unit
writeIndexFileAndCommit first finds the index file for the input
writeIndexFileAndCommit creates a temporary file for the index file (in the same directory) and writes offsets (as the moving sum of the input
lengths starting from 0 to the final offset at the end for the end of the output file).
The offsets are the sizes in the input
writeIndexFileAndCommit requests a shuffle block data file for the input
writeIndexFileAndCommit checks if the given index and data files match each other (aka consistency check).
If the consistency check fails, it means that another attempt for the same task has already written the map outputs successfully and so the input
dataTmp and temporary index files are deleted (as no longer correct).
If the consistency check succeeds, the existing index and data files are deleted (if they exist) and the temporary index and data files become "official", i.e. renamed to their final names.
In case of any IO-related exception,
writeIndexFileAndCommit throws a
IOException with the messages:
fail to rename file [indexTmp] to [indexFile]
fail to rename file [dataTmp] to [dataFile]
getBlockData( blockId: ShuffleBlockId): ManagedBuffer
getBlockData finds the index file for the input shuffle
blockId.reduceId bytes of data from the index file.
|The start and end offsets are the offset and the length of the file segment for the block data.|
In the end,
getBlockData closes the index file.
Checking Consistency of Shuffle Index and Data Files and Returning Block Lengths —
checkIndexAndDataFile Internal Method
checkIndexAndDataFile( index: File, data: File, blocks: Int): Array[Long]
checkIndexAndDataFile first checks if the size of the input
index file is exactly the input
blocks multiplied by
null when the numbers, and hence the shuffle index and data files, don’t match.
checkIndexAndDataFile reads the shuffle
index file and converts the offsets into lengths of each block.
checkIndexAndDataFile makes sure that the size of the input shuffle
data file is exactly the sum of the block lengths.
checkIndexAndDataFile returns the block lengths if the numbers match, and
removeDataByMap(shuffleId: Int, mapId: Int): Unit
removeDataByMap fails deleting the files,
removeDataByMap prints out the following WARN message to the logs.
Error deleting data [path]
Error deleting index [path]
stop is a noop operation, i.e. does nothing when called.
getIndexFile( shuffleId: Int, mapId: Int): File
getDataFile( shuffleId: Int, mapId: Int): File
ALL logging level for
org.apache.spark.shuffle.IndexShuffleBlockResolver logger to see what happens inside.
Add the following line to
Refer to Logging.