Skip to content

IndexShuffleBlockResolver

IndexShuffleBlockResolver is a ShuffleBlockResolver that manages shuffle block data and uses shuffle index files for faster shuffle data access.

Creating Instance

IndexShuffleBlockResolver takes the following to be created:

IndexShuffleBlockResolver is created when:

IndexShuffleBlockResolver and SortShuffleManager

getStoredShuffles

getStoredShuffles(): Seq[ShuffleBlockInfo]

getStoredShuffles is part of the MigratableResolver abstraction.

getStoredShuffles...FIXME

putShuffleBlockAsStream

putShuffleBlockAsStream(
  blockId: BlockId,
  serializerManager: SerializerManager): StreamCallbackWithID

putShuffleBlockAsStream is part of the MigratableResolver abstraction.

putShuffleBlockAsStream...FIXME

getMigrationBlocks

getMigrationBlocks(
  shuffleBlockInfo: ShuffleBlockInfo): List[(BlockId, ManagedBuffer)]

getMigrationBlocks is part of the MigratableResolver abstraction.

getMigrationBlocks...FIXME

Writing Shuffle Index and Data Files

writeIndexFileAndCommit(
  shuffleId: Int,
  mapId: Long,
  lengths: Array[Long],
  dataTmp: File): Unit

writeIndexFileAndCommit finds the index and data files for the input shuffleId and mapId.

writeIndexFileAndCommit creates a temporary file for the index file (in the same directory) and writes offsets (as the moving sum of the input lengths starting from 0 to the final offset at the end for the end of the output file).

Note

The offsets are the sizes in the input lengths exactly.

writeIndexFileAndCommit and offsets in a shuffle index file

writeIndexFileAndCommit...FIXME (Review me)

writeIndexFileAndCommit <> for the input shuffleId and mapId.

writeIndexFileAndCommit <> (aka consistency check).

If the consistency check fails, it means that another attempt for the same task has already written the map outputs successfully and so the input dataTmp and temporary index files are deleted (as no longer correct).

If the consistency check succeeds, the existing index and data files are deleted (if they exist) and the temporary index and data files become "official", i.e. renamed to their final names.

In case of any IO-related exception, writeIndexFileAndCommit throws a IOException with the messages:

fail to rename file [indexTmp] to [indexFile]

or

fail to rename file [dataTmp] to [dataFile]

writeIndexFileAndCommit is used when:

Removing Shuffle Index and Data Files

removeDataByMap(
  shuffleId: Int,
  mapId: Long): Unit

removeDataByMap finds and deletes the shuffle data file (for the input shuffleId and mapId) followed by finding and deleting the shuffle data index file.

removeDataByMap is used when:

  • SortShuffleManager is requested to unregister a shuffle (and remove a shuffle from a shuffle system)

Creating Shuffle Block Index File

getIndexFile(
  shuffleId: Int,
  mapId: Long,
  dirs: Option[Array[String]] = None): File

getIndexFile creates a ShuffleIndexBlockId.

With dirs local directories defined, getIndexFile places the index file of the ShuffleIndexBlockId (by the name) in the local directories (with the spark.diskStore.subDirectories).

Otherwise, with no local directories, getIndexFile requests the DiskBlockManager (of the BlockManager) to get the data file.

getIndexFile is used when:

Creating Shuffle Block Data File

getDataFile(
  shuffleId: Int,
  mapId: Long): File // (1)
getDataFile(
  shuffleId: Int,
  mapId: Long,
  dirs: Option[Array[String]]): File
  1. dirs is None (undefined)

getDataFile creates a ShuffleDataBlockId.

With dirs local directories defined, getDataFile places the data file of the ShuffleDataBlockId (by the name) in the local directories (with the spark.diskStore.subDirectories).

Otherwise, with no local directories, getDataFile requests the DiskBlockManager (of the BlockManager) to get the data file.

getDataFile is used when:

Creating ManagedBuffer to Read Shuffle Block Data File

getBlockData(
  blockId: BlockId,
  dirs: Option[Array[String]]): ManagedBuffer

getBlockData is part of the ShuffleBlockResolver abstraction.

getBlockData...FIXME

Checking Consistency of Shuffle Index and Data Files

checkIndexAndDataFile(
  index: File,
  data: File,
  blocks: Int): Array[Long]

Danger

Review Me

checkIndexAndDataFile first checks if the size of the input index file is exactly the input blocks multiplied by 8.

checkIndexAndDataFile returns null when the numbers, and hence the shuffle index and data files, don't match.

checkIndexAndDataFile reads the shuffle index file and converts the offsets into lengths of each block.

checkIndexAndDataFile makes sure that the size of the input shuffle data file is exactly the sum of the block lengths.

checkIndexAndDataFile returns the block lengths if the numbers match, and null otherwise.

TransportConf

IndexShuffleBlockResolver creates a TransportConf (for shuffle module) when created.

transportConf is used in getMigrationBlocks and getBlockData.

Logging

Enable ALL logging level for org.apache.spark.shuffle.IndexShuffleBlockResolver logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.IndexShuffleBlockResolver=ALL

Refer to Logging.