BypassMergeSortShuffleWriter

BypassMergeSortShuffleWriter write
Figure 1. BypassMergeSortShuffleWriter and DiskBlockObjectWriters

Creating Instance

BypassMergeSortShuffleWriter takes the following to be created:

BypassMergeSortShuffleWriter is created when SortShuffleManager is requested for a ShuffleWriter (for a BypassMergeSortShuffleHandle).

Per-Partition DiskBlockObjectWriters

BypassMergeSortShuffleWriter uses DiskBlockObjectWriter when…​FIXME

IndexShuffleBlockResolver

BypassMergeSortShuffleWriter is given a IndexShuffleBlockResolver to be created.

BypassMergeSortShuffleWriter uses the IndexShuffleBlockResolver for writing records (to writeIndexFileAndCommit and getDataFile).

Serializer

When created, BypassMergeSortShuffleWriter requests the ShuffleDependency (of the given BypassMergeSortShuffleHandle) for the Serializer.

BypassMergeSortShuffleWriter creates a new instance of the Serializer for writing records.

Configuration Properties

spark.shuffle.file.buffer

BypassMergeSortShuffleWriter uses spark.shuffle.file.buffer configuration property for…​FIXME

spark.file.transferTo

BypassMergeSortShuffleWriter uses spark.file.transferTo configuration property to control whether to use Java New I/O while writing to a partitioned file.

Writing Records to Shuffle File

void write(
  Iterator<Product2<K, V>> records)

write creates a new instance of the Serializer.

write initializes the partitionWriters and partitionWriterSegments internal registries (for DiskBlockObjectWriters and FileSegments for every partition, respectively).

write requests the BlockManager for the DiskBlockManager and for every partition write requests it for a shuffle block ID and the file. write creates a DiskBlockObjectWriter for the shuffle block (using the BlockManager). write stores the reference to DiskBlockObjectWriters in the partitionWriters internal registry.

After all DiskBlockObjectWriters are created, write requests the ShuffleWriteMetrics to increment shuffle write time metric.

For every record (a key-value pair), write requests the Partitioner for the partition ID for the key. The partition ID is then used as an index of the partition writer (among the DiskBlockObjectWriters) to write the current record out to a block file.

Once all records have been writted out to their respective block files, write does the following for every DiskBlockObjectWriter:

  1. Requests the DiskBlockObjectWriter to commit and return a corresponding FileSegment of the shuffle block

  2. Saves the (reference to) FileSegments in the partitionWriterSegments internal registry

  3. Requests the DiskBlockObjectWriter to close

At this point, all the records are in shuffle block files on a local disk. The records are split across block files by key.

write requests the IndexShuffleBlockResolver for the shuffle file for the shuffle and the map IDs.

write creates a temporary file (based on the name of the shuffle file) and writes all the per-partition shuffle files to it. The size of every per-partition shuffle files is saved as the partitionLengths internal registry.

At this point, all the per-partition shuffle block files are one single map shuffle data file.

write requests the IndexShuffleBlockResolver to write shuffle index and data files for the shuffle and the map IDs (with the partitionLengths and the temporary shuffle output file).

write returns a shuffle map output status (with the shuffle server ID and the partitionLengths).

write is part of ShuffleWriter abstraction.

No Records

When there is no records to write out, write initializes the partitionLengths internal array (of numPartitions size) with all elements being 0.

write requests the IndexShuffleBlockResolver to write shuffle index and data files, but the difference (compared to when there are records to write) is that the dataTmp argument is simply null.

write sets the internal mapStatus (with the address of BlockManager in use and partitionLengths).

Requirements

write requires that there are no DiskBlockObjectWriters.

Concatenating Per-Partition Files Into Single File

long[] writePartitionedFile(
  File outputFile)

writePartitionedFile creates a file output stream for the input outputFile in append mode.

writePartitionedFile starts tracking write time (as writeStartTime).

For every numPartitions partition, writePartitionedFile takes the file from the FileSegment (from partitionWriterSegments) and creates a file input stream to read raw bytes.

writePartitionedFile then copies the raw bytes from each partition segment input stream to outputFile (possibly using Java New I/O per transferToEnabled flag set when BypassMergeSortShuffleWriter was created) and records the length of the shuffle data file (in lengths internal array).

In the end, writePartitionedFile increments shuffle write time, clears partitionWriters array and returns the lengths of the shuffle data files per partition.

writePartitionedFile is used when BypassMergeSortShuffleWriter is requested to write records.

Copying Raw Bytes Between Input Streams

copyStream(
  in: InputStream,
  out: OutputStream,
  closeStreams: Boolean = false,
  transferToEnabled: Boolean = false): Long

copyStream branches off depending on the type of in and out streams, i.e. whether they are both FileInputStream with transferToEnabled input flag is enabled.

If they are both FileInputStream with transferToEnabled enabled, copyStream gets their FileChannels and transfers bytes from the input file to the output file and counts the number of bytes, possibly zero, that were actually transferred.

copyStream uses Java’s java.nio.channels.FileChannel to manage file channels.

If either in and out input streams are not FileInputStream or transferToEnabled flag is disabled (default), copyStream reads data from in to write to out and counts the number of bytes written.

copyStream can optionally close in and out streams (depending on the input closeStreams — disabled by default).

Visit the official web site of JSR 51: New I/O APIs for the Java Platform and read up on java.nio package.

Logging

Enable ALL logging level for org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter=ALL

Refer to Logging.

Internal Properties

Name Description

numPartitions

partitionWriterSegments

mapStatus

MapStatus that BypassMergeSortShuffleWriter returns when stopped

Initialized every time BypassMergeSortShuffleWriter writes records.

Used when BypassMergeSortShuffleWriter stops (with success enabled) as a marker if any records were written and returned if they did.

partitionLengths

Temporary array of partition lengths after records are written to a shuffle system.

Initialized every time BypassMergeSortShuffleWriter writes records before passing it in to IndexShuffleBlockResolver). After IndexShuffleBlockResolver finishes, it is used to initialize mapStatus internal property.