Skip to content


= BypassMergeSortShuffleWriter

BypassMergeSortShuffleWriter is a[ShuffleWriter] for[ShuffleMapTasks] to <>.

.BypassMergeSortShuffleWriter and DiskBlockObjectWriters image::BypassMergeSortShuffleWriter-write.png[align="center"]

== [[creating-instance]] Creating Instance

BypassMergeSortShuffleWriter takes the following to be created:

  • [[blockManager]][BlockManager]
  • <>
  • [[handle]][BypassMergeSortShuffleHandle]
  • [[mapId]] Map ID
  • [[taskContext]][TaskContext]
  • [[conf]][SparkConf]

BypassMergeSortShuffleWriter is created when SortShuffleManager is requested for a[ShuffleWriter] (for a <>).

== [[partitionWriters]] Per-Partition DiskBlockObjectWriters

BypassMergeSortShuffleWriter uses[DiskBlockObjectWriter] when...FIXME

== [[shuffleBlockResolver]] IndexShuffleBlockResolver

BypassMergeSortShuffleWriter is given a[IndexShuffleBlockResolver] to be created.

BypassMergeSortShuffleWriter uses the IndexShuffleBlockResolver for <> (to[writeIndexFileAndCommit] and[getDataFile]).

== [[serializer]] Serializer

When created, BypassMergeSortShuffleWriter requests the[ShuffleDependency] (of the given <>) for the Serializer.

BypassMergeSortShuffleWriter creates a new instance of the Serializer for <>.

== [[configuration-properties]] Configuration Properties

=== [[fileBufferSize]][[spark.shuffle.file.buffer]] spark.shuffle.file.buffer

BypassMergeSortShuffleWriter uses[spark.shuffle.file.buffer] configuration property for...FIXME

=== [[transferToEnabled]][[spark.file.transferTo]] spark.file.transferTo

BypassMergeSortShuffleWriter uses[spark.file.transferTo] configuration property to control whether to use Java New I/O while <>.

== [[write]] Writing Records to Shuffle File

[source, java]

void write( Iterator> records)

write creates a new instance of the <>.

write initializes the <> and <> internal registries (for[DiskBlockObjectWriters] and FileSegments for <>, respectively).

write requests the <> for the[DiskBlockManager] and for <> write requests it for a[shuffle block ID and the file]. write creates a[DiskBlockObjectWriter] for the shuffle block (using the <>). write stores the reference to DiskBlockObjectWriters in the <> internal registry.

After all DiskBlockObjectWriters are created, write requests the <> to[increment shuffle write time metric].

For every record (a key-value pair), write requests the <> for the[partition ID] for the key. The partition ID is then used as an index of the partition writer (among the <>) to[write the current record out to a block file].

Once all records have been writted out to their respective block files, write does the following for every <>:

. Requests the DiskBlockObjectWriter to[commit and return a corresponding FileSegment of the shuffle block]

. Saves the (reference to) FileSegments in the <> internal registry

. Requests the DiskBlockObjectWriter to[close]

NOTE: At this point, all the records are in shuffle block files on a local disk. The records are split across block files by key.

write requests the <> for the[shuffle file] for the <> and the <>.

write creates a temporary file (based on the name of the shuffle file) and <>. The size of every per-partition shuffle files is saved as the <> internal registry.

NOTE: At this point, all the per-partition shuffle block files are one single map shuffle data file.

write requests the <> to[write shuffle index and data files] for the <> and the <> (with the <> and the temporary shuffle output file).

write returns a[shuffle map output status] (with the[shuffle server ID] and the <>).

write is part of[ShuffleWriter] abstraction.

=== [[write-no-records]] No Records

When there is no records to write out, write initializes the <> internal array (of <> size) with all elements being 0.

write requests the <> to[write shuffle index and data files], but the difference (compared to when there are records to write) is that the dataTmp argument is simply null.

write sets the internal mapStatus (with the address of[BlockManager] in use and <>).

=== [[write-requirements]] Requirements

write requires that there are no <>.

== [[writePartitionedFile]] Concatenating Per-Partition Files Into Single File

[source, java]

long[] writePartitionedFile( File outputFile)

writePartitionedFile creates a file output stream for the input outputFile in append mode.

writePartitionedFile starts tracking write time (as writeStartTime).

For every <> partition, writePartitionedFile takes the file from the FileSegment (from <>) and creates a file input stream to read raw bytes.

writePartitionedFile then <outputFile>> (possibly using Java New I/O per <> flag set when <>) and records the length of the shuffle data file (in lengths internal array).

In the end, writePartitionedFile[increments shuffle write time], clears <> array and returns the lengths of the shuffle data files per partition.

writePartitionedFile is used when BypassMergeSortShuffleWriter is requested to <>.

== [[copyStream]] Copying Raw Bytes Between Input Streams

[source, scala]

copyStream( in: InputStream, out: OutputStream, closeStreams: Boolean = false, transferToEnabled: Boolean = false): Long

copyStream branches off depending on the type of in and out streams, i.e. whether they are both FileInputStream with transferToEnabled input flag is enabled.

If they are both FileInputStream with transferToEnabled enabled, copyStream gets their FileChannels and transfers bytes from the input file to the output file and counts the number of bytes, possibly zero, that were actually transferred.

NOTE: copyStream uses Java's {java-javadoc-url}/java/nio/channels/FileChannel.html[java.nio.channels.FileChannel] to manage file channels.

If either in and out input streams are not FileInputStream or transferToEnabled flag is disabled (default), copyStream reads data from in to write to out and counts the number of bytes written.

copyStream can optionally close in and out streams (depending on the input closeStreams -- disabled by default).

NOTE: Utils.copyStream is used when <> (among other places).

TIP: Visit the official web site of[JSR 51: New I/O APIs for the Java Platform] and read up on {java-javadoc-url}/java/nio/package-summary.html[java.nio package].

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter logger to see what happens inside.

Add the following line to conf/


Refer to[Logging].

== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| numPartitions | [[numPartitions]]

| partitionWriterSegments | [[partitionWriterSegments]]

| mapStatus | [[mapStatus]][MapStatus] that <>

Initialized every time BypassMergeSortShuffleWriter <>.

Used when <> (with success enabled) as a marker if <> and <>.

| partitionLengths | [[partitionLengths]] Temporary array of partition lengths after records are <>.

Initialized every time BypassMergeSortShuffleWriter <> before passing it in to[IndexShuffleBlockResolver]). After IndexShuffleBlockResolver finishes, it is used to initialize <> internal property.


Last update: 2020-10-09