BypassMergeSortShuffleWriter — ShuffleWriter for BypassMergeSortShuffleHandle

spark BypassMergeSortShuffleWriter write
Figure 1. BypassMergeSortShuffleWriter writing records (for ShuffleMapTask) using DiskBlockObjectWriters
Table 1. BypassMergeSortShuffleWriter’s Internal Registries and Counters
Name Description









Initialized when BypassMergeSortShuffleWriter is created.

Used when BypassMergeSortShuffleWriter writes records.


MapStatus that BypassMergeSortShuffleWriter returns when stopped

Initialized every time BypassMergeSortShuffleWriter writes records.

Used when BypassMergeSortShuffleWriter stops (with success enabled) as a marker if any records were written and returned if they did.


Temporary array of partition lengths after records are written to a shuffle system.

Initialized every time BypassMergeSortShuffleWriter writes records before passing it in to IndexShuffleBlockResolver). After IndexShuffleBlockResolver finishes, it is used to initialize mapStatus internal property.


Internal flag that controls the use of Java New I/O when BypassMergeSortShuffleWriter concatenates per-partition shuffle files into a single shuffle block data file.

Specified when BypassMergeSortShuffleWriter is created and controlled by spark.file.transferTo Spark property. Enabled by default.

Enable ERROR logging level for org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter logger to see what happens in BypassMergeSortShuffleWriter.

Add the following line to conf/

Refer to Logging.

Creating BypassMergeSortShuffleWriter Instance

BypassMergeSortShuffleWriter takes the following when created:

BypassMergeSortShuffleWriter uses spark.shuffle.file.buffer (for fileBufferSize as 32k by default) and spark.file.transferTo (for transferToEnabled internal flag which is enabled by default) Spark properties.

BypassMergeSortShuffleWriter initializes the internal registries and counters.

Writing Records (Into One Single Shuffle Block Data File) — write Method

void write(Iterator<Product2<K, V>> records)
write is part of ShuffleWriter contract to write a sequence of records to a shuffle system.

Internally, when the input records iterator has no more records, write creates an empty partitionLengths internal array of numPartitions size.

write then requests the internal IndexShuffleBlockResolver to write shuffle index and data files (with dataTmp as null) and sets the internal mapStatus (with the address of BlockManager in use and partitionLengths).

However, when there are records to write, write creates a new Serializer.

Serializer was specified when BypassMergeSortShuffleWriter was created and is exactly the Serializer of the ShuffleDependency.

write initializes partitionWriters internal array of DiskBlockObjectWriters for every partition.

write requests BlockManager for a DiskBlockObjectWriter (for the temporary blockId and file, SerializerInstance, fileBufferSize and writeMetrics).

write initializes partitionWriterSegments with FileSegment for every partition.

write takes records serially, i.e. record by record, and, after computing the partition for a key, requests the corresponding DiskBlockObjectWriter to write them.

write uses partitionWriters internal array of DiskBlockObjectWriter indexed by partition number.
write initializes partitionWriters with numPartitions number of DiskBlockObjectWriters.

After all the records have been written, write requests every DiskBlockObjectWriter to commitAndGet and saves the commit results in partitionWriterSegments. write closes every DiskBlockObjectWriter.

IndexShuffleBlockResolver was defined when BypassMergeSortShuffleWriter was created.

write creates a temporary shuffle block data file and writes the per-partition shuffle files to it.

This is the moment when BypassMergeSortShuffleWriter concatenates per-partition shuffle file segments into one single map shuffle data file.

In the end, write requests IndexShuffleBlockResolver to write shuffle index and data files for the shuffleId and mapId (with partitionLengths and the temporary file) and creates a new mapStatus (with the location of the BlockManager and partitionLengths).

Concatenating Per-Partition Files Into Single File (and Tracking Write Time) — writePartitionedFile Internal Method

long[] writePartitionedFile(File outputFile) throws IOException

writePartitionedFile creates a file output stream for the input outputFile in append mode.

writePartitionedFile uses Java’s to create a file output stream.

writePartitionedFile starts tracking write time (as writeStartTime).

For every numPartitions partition, writePartitionedFile takes the file from the FileSegment (from partitionWriterSegments) and creates a file input stream to read raw bytes.

writePartitionedFile uses Java’s to create a file input stream.

writePartitionedFile then copies the raw bytes from each partition segment input stream to outputFile (possibly using Java New I/O per transferToEnabled flag set when BypassMergeSortShuffleWriter was created) and records the length of the shuffle data file (in lengths internal array).

transferToEnabled is controlled by spark.file.transferTo Spark property and is enabled (i.e. true) by default.

In the end, writePartitionedFile increments shuffle write time, clears partitionWriters array and returns the lengths of the shuffle data files per partition.

writePartitionedFile uses ShuffleWriteMetrics to track shuffle write time that was created when BypassMergeSortShuffleWriter was created.
writePartitionedFile is used exclusively when BypassMergeSortShuffleWriter writes records.

Copying Raw Bytes Between Input Streams (Possibly Using Java New I/O) — Utils.copyStream Method

  in: InputStream,
  out: OutputStream,
  closeStreams: Boolean = false,
  transferToEnabled: Boolean = false): Long

copyStream branches off depending on the type of in and out streams, i.e. whether they are both FileInputStream with transferToEnabled input flag is enabled.

If they are both FileInputStream with transferToEnabled enabled, copyStream gets their FileChannels and transfers bytes from the input file to the output file and counts the number of bytes, possibly zero, that were actually transferred.

copyStream uses Java’s java.nio.channels.FileChannel to manage file channels.

If either in and out input streams are not FileInputStream or transferToEnabled flag is disabled (default), copyStream reads data from in to write to out and counts the number of bytes written.

copyStream can optionally close in and out streams (depending on the input closeStreams — disabled by default).

Utils.copyStream is here temporarily (until I find a better place).
Visit the official web site of JSR 51: New I/O APIs for the Java Platform and read up on java.nio package.