Skip to content

BypassMergeSortShuffleWriter

BypassMergeSortShuffleWriter<K, V> is a ShuffleWriter for ShuffleMapTasks to write records into one single shuffle block data file.

BypassMergeSortShuffleWriter and DiskBlockObjectWriters

Creating Instance

BypassMergeSortShuffleWriter takes the following to be created:

BypassMergeSortShuffleWriter is created when:

DiskBlockObjectWriters

DiskBlockObjectWriter[] partitionWriters

BypassMergeSortShuffleWriter uses a DiskBlockObjectWriter per partition (based on the Partitioner).

BypassMergeSortShuffleWriter asserts that no partitionWriters are created while writing out records to a shuffle file.

While writing, BypassMergeSortShuffleWriter requests the BlockManager for as many DiskBlockObjectWriters as there are partitions (in the Partitioner).

While writing, BypassMergeSortShuffleWriter requests the Partitioner for a partition for records (using keys) and finds the per-partition DiskBlockObjectWriter that is requested to write out the partition records. After all records are written out to their shuffle files, the DiskBlockObjectWriters are requested to commitAndGet.

BypassMergeSortShuffleWriter uses the partition writers while writing out partition data and removes references to them (nullify them) in the end.

In other words, after writing out partition data partitionWriters internal registry is null.

partitionWriters internal registry becomes null after BypassMergeSortShuffleWriter has finished:

IndexShuffleBlockResolver

BypassMergeSortShuffleWriter is given a IndexShuffleBlockResolver when created.

BypassMergeSortShuffleWriter uses the IndexShuffleBlockResolver for writing out records (to writeIndexFileAndCommit and getDataFile).

Serializer

When created, BypassMergeSortShuffleWriter requests the ShuffleDependency (of the given BypassMergeSortShuffleHandle) for the Serializer.

BypassMergeSortShuffleWriter creates a new instance of the Serializer for writing out records.

Configuration Properties

spark.shuffle.file.buffer

BypassMergeSortShuffleWriter uses spark.shuffle.file.buffer configuration property for...FIXME

spark.file.transferTo

BypassMergeSortShuffleWriter uses spark.file.transferTo configuration property to control whether to use Java New I/O while writing to a partitioned file.

Writing Out Records to Shuffle File

void write(
  Iterator<Product2<K, V>> records)

write is part of the ShuffleWriter abstraction.

write creates a new instance of the Serializer.

write initializes the partitionWriters and partitionWriterSegments internal registries (for DiskBlockObjectWriters and FileSegments for every partition, respectively).

write requests the BlockManager for the DiskBlockManager and for every partition write requests it for a shuffle block ID and the file. write creates a DiskBlockObjectWriter for the shuffle block (using the BlockManager). write stores the reference to DiskBlockObjectWriters in the partitionWriters internal registry.

After all DiskBlockObjectWriters are created, write requests the ShuffleWriteMetrics to increment shuffle write time metric.

For every record (a key-value pair), write requests the Partitioner for the partition ID for the key. The partition ID is then used as an index of the partition writer (among the DiskBlockObjectWriters) to write the current record out to a block file.

Once all records have been writted out to their respective block files, write does the following for every DiskBlockObjectWriter:

  1. Requests the DiskBlockObjectWriter to commit and return a corresponding FileSegment of the shuffle block

  2. Saves the (reference to) FileSegments in the partitionWriterSegments internal registry

  3. Requests the DiskBlockObjectWriter to close

Note

At this point, all the records are in shuffle block files on a local disk. The records are split across block files by key.

write requests the IndexShuffleBlockResolver for the shuffle file for the shuffle and the mapDs>>.

write creates a temporary file (based on the name of the shuffle file) and writes all the per-partition shuffle files to it. The size of every per-partition shuffle files is saved as the partitionLengths internal registry.

Note

At this point, all the per-partition shuffle block files are one single map shuffle data file.

write requests the IndexShuffleBlockResolver to write shuffle index and data files for the shuffle and the map IDs (with the partitionLengths and the temporary shuffle output file).

write returns a shuffle map output status (with the shuffle server ID and the partitionLengths).

No Records

When there is no records to write out, write initializes the partitionLengths internal array (of numPartitions size) with all elements being 0.

write requests the IndexShuffleBlockResolver to write shuffle index and data files, but the difference (compared to when there are records to write) is that the dataTmp argument is simply null.

write sets the internal mapStatus (with the address of BlockManager in use and partitionLengths).

Requirements

write requires that there are no DiskBlockObjectWriters.

Writing Out Partitioned Data

long[] writePartitionedData(
  ShuffleMapOutputWriter mapOutputWriter)

writePartitionedData makes sure that DiskBlockObjectWriters are available (partitionWriters != null).

For every partition, writePartitionedData takes the partition file (from the FileSegments). Only when the partition file exists, writePartitionedData requests the given ShuffleMapOutputWriter for a ShufflePartitionWriter and writes out the partitioned data. At the end, writePartitionedData deletes the file.

writePartitionedData requests the ShuffleWriteMetricsReporter to increment the write time.

In the end, writePartitionedData requests the ShuffleMapOutputWriter to commitAllPartitions and returns the size of each partition of the output map file.

Copying Raw Bytes Between Input Streams

copyStream(
  in: InputStream,
  out: OutputStream,
  closeStreams: Boolean = false,
  transferToEnabled: Boolean = false): Long

copyStream branches off depending on the type of in and out streams, i.e. whether they are both FileInputStream with transferToEnabled input flag is enabled.

If they are both FileInputStream with transferToEnabled enabled, copyStream gets their FileChannels and transfers bytes from the input file to the output file and counts the number of bytes, possibly zero, that were actually transferred.

NOTE: copyStream uses Java's {java-javadoc-url}/java/nio/channels/FileChannel.html[java.nio.channels.FileChannel] to manage file channels.

If either in and out input streams are not FileInputStream or transferToEnabled flag is disabled (default), copyStream reads data from in to write to out and counts the number of bytes written.

copyStream can optionally close in and out streams (depending on the input closeStreams -- disabled by default).

NOTE: Utils.copyStream is used when <> (among other places).

Tip

Visit the official web site of JSR 51: New I/O APIs for the Java Platform and read up on java.nio package.

Stopping ShuffleWriter

Option<MapStatus> stop(
  boolean success)

stop...FIXME

stop is part of the ShuffleWriter abstraction.

Temporary Array of Partition Lengths

long[] partitionLengths

Temporary array of partition lengths after records are written to a shuffle system.

Initialized every time BypassMergeSortShuffleWriter writes out records (before passing it in to IndexShuffleBlockResolver). After IndexShuffleBlockResolver finishes, it is used to initialize mapStatus internal property.

Logging

Enable ALL logging level for org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter=ALL

Refer to Logging.

Internal Properties

numPartitions

partitionWriterSegments

mapStatus

MapStatus that BypassMergeSortShuffleWriter returns when stopped

Initialized every time BypassMergeSortShuffleWriter writes out records.

Used when BypassMergeSortShuffleWriter stops (with success enabled) as a marker if any records were written and returned if they did.