UnsafeShuffleWriter

UnsafeShuffleWriter is a ShuffleWriter for SerializedShuffleHandles.

UnsafeShuffleWriter is created when SortShuffleManager is requested for a ShuffleWriter for a SerializedShuffleHandle.

UnsafeShuffleWriter
Figure 1. UnsafeShuffleWriter, SortShuffleManager and SerializedShuffleHandle

UnsafeShuffleWriter opens resources (a ShuffleExternalSorter and the buffers) immediately while being created.

UnsafeShuffleWriter ShuffleExternalSorter
Figure 2. UnsafeShuffleWriter and ShuffleExternalSorter

When requested to write key-value records of a partition, UnsafeShuffleWriter simply inserts every record into ShuffleExternalSorter followed by close internal resources and merge spill files (that, among other things, creates the MapStatus).

When requested to stop, UnsafeShuffleWriter records the peak execution memory metric and returns the mapStatus (that was created when requested to write).

Creating Instance

UnsafeShuffleWriter takes the following to be created:

UnsafeShuffleWriter requests the SerializedShuffleHandle for the ShuffleDependency that is then requested for the Partitioner and, in the end, for the number of partitions. UnsafeShuffleWriter makes sure that the number of shuffle output partitions is below (1 << 24) partition identifiers that can be encoded and throws an IllegalArgumentException if not met:

UnsafeShuffleWriter can only be used for shuffles with at most 16777215 reduce partitions
The number of shuffle output partitions is first enforced when SortShuffleManager checks if SerializedShuffleHandle can be used for ShuffleHandle (that eventually leads to UnsafeShuffleWriter).

IndexShuffleBlockResolver

UnsafeShuffleWriter is given a IndexShuffleBlockResolver to be created.

UnsafeShuffleWriter uses the IndexShuffleBlockResolver for…​FIXME

DEFAULT_INITIAL_SER_BUFFER_SIZE

UnsafeShuffleWriter uses a fixed buffer size for the output stream of serialized data written into a byte array (default: 1024 * 1024).

inputBufferSizeInBytes

UnsafeShuffleWriter uses the spark.shuffle.file.buffer configuration property for…​FIXME

outputBufferSizeInBytes

UnsafeShuffleWriter uses the spark.shuffle.unsafe.file.output.buffer configuration property (default: 32k) for…​FIXME

transferToEnabled

UnsafeShuffleWriter can use a specialized NIO-based fast merge procedure that avoids extra serialization/deserialization when spark.file.transferTo configuration property is enabled.

initialSortBufferSize

UnsafeShuffleWriter uses the initial buffer size for sorting (default: 4096) when creating a ShuffleExternalSorter (when requested to open).

Use spark.shuffle.sort.initialBufferSize configuration property to change the default buffer size.

mergeSpillsWithTransferTo Internal Method

long[] mergeSpillsWithTransferTo(
  SpillInfo[] spills,
  File outputFile)

mergeSpillsWithTransferTo…​FIXME

mergeSpillsWithTransferTo is used when UnsafeShuffleWriter is requested to mergeSpills (with the transferToEnabled flag enabled and no encryption).

Merging Spills

long[] mergeSpills(
  SpillInfo[] spills,
  File outputFile)

Many Spills

With multiple SpillInfos to merge, mergeSpills selects between fast and slow merge strategies. The fast merge strategy can be transferTo- or fileStream-based.

mergeSpills uses the spark.shuffle.unsafe.fastMergeEnabled configuration property to consider one of the fast merge strategies.

A fast merge strategy is supported when spark.shuffle.compress configuration property is disabled or the IO compression codec supports decompression of concatenated compressed streams.

With spark.shuffle.compress configuration property enabled, mergeSpills will always use the slow merge strategy.

With fast merge strategy enabled and supported, transferToEnabled enabled and encryption disabled, mergeSpills prints out the following DEBUG message to the logs and mergeSpillsWithTransferTo.

Using transferTo-based fast merge

With fast merge strategy enabled and supported, no transferToEnabled or encryption enabled, mergeSpills prints out the following DEBUG message to the logs and mergeSpillsWithFileStream (with no compression codec).

Using fileStream-based fast merge

For slow merge, mergeSpills prints out the following DEBUG message to the logs and mergeSpillsWithFileStream (with the compression codec).

Using slow merge

In the end, mergeSpills requests the ShuffleWriteMetrics to decBytesWritten and incBytesWritten, and returns the partition length array.

One Spill

With one SpillInfo to merge, mergeSpills simply renames the spill file to be the output file and returns the partition length array of the one spill.

No Spills

With no SpillInfos to merge, mergeSpills creates an empty output file and returns an array of 0s of size of the numPartitions of the Partitioner.

Usage

mergeSpills is used when UnsafeShuffleWriter is requested to close internal resources and merge spill files.

updatePeakMemoryUsed Internal Method

void updatePeakMemoryUsed()

updatePeakMemoryUsed…​FIXME

updatePeakMemoryUsed is used when UnsafeShuffleWriter is requested for the peak memory used and to close internal resources and merge spill files.

Writing Key-Value Records of Partition

void write(
  Iterator<Product2<K, V>> records)

write traverses the input sequence of records (for a RDD partition) and insertRecordIntoSorter one by one. When all the records have been processed, write closes internal resources and merges spill files.

FIXME

write is part of the ShuffleWriter abstraction.

Stopping ShuffleWriter

Option<MapStatus> stop(
  boolean success)

stop…​FIXME

stop is part of the ShuffleWriter abstraction.

Inserting Record Into ShuffleExternalSorter

void insertRecordIntoSorter(
  Product2<K, V> record)

insertRecordIntoSorter requires that the ShuffleExternalSorter is available.

insertRecordIntoSorter requests the MyByteArrayOutputStream to reset (so that all currently accumulated output in the output stream is discarded and reusing the already allocated buffer space).

insertRecordIntoSorter requests the SerializationStream to write out the record (write the key and the value) and to flush.

insertRecordIntoSorter requests the MyByteArrayOutputStream for the length of the buffer.

insertRecordIntoSorter requests the Partitioner for the partition for the given record (by the key).

In the end, insertRecordIntoSorter requests the ShuffleExternalSorter to insert the MyByteArrayOutputStream as a byte array (with the length and the partition).

insertRecordIntoSorter is used when UnsafeShuffleWriter is requested to write records.

Closing Internal Resources and Merging Spill Files

void closeAndWriteOutput()

closeAndWriteOutput asserts that the ShuffleExternalSorter is available (non-null).

closeAndWriteOutput updates peak memory used.

closeAndWriteOutput removes the references to the ByteArrayOutputStream and SerializationStream output streams (nulls them).

closeAndWriteOutput requests the ShuffleExternalSorter to close and return spill metadata.

closeAndWriteOutput removes the reference to the ShuffleExternalSorter (nulls it).

closeAndWriteOutput requests the IndexShuffleBlockResolver for the output data file for the shuffle and map IDs.

closeAndWriteOutput creates a temporary file (along the data output file) and uses it to merge spill files (that gives a partition length array). All spill files are then deleted.

In the end, closeAndWriteOutput creates a MapStatus with the location of the local BlockManager and the partition length array.

closeAndWriteOutput prints out the following ERROR message to the logs if there is an issue with deleting spill files:

Error while deleting spill file [path]

closeAndWriteOutput prints out the following ERROR message to the logs if there is an issue with deleting the temporary output data file:

Error while deleting temp file [path]

closeAndWriteOutput is used when UnsafeShuffleWriter is requested to write records.

mergeSpillsWithFileStream Method

long[] mergeSpillsWithFileStream(
  SpillInfo[] spills,
  File outputFile,
  CompressionCodec compressionCodec)

mergeSpillsWithFileStream will be given an IO compression codec when shuffle compression is enabled.

mergeSpillsWithFileStream…​FIXME

mergeSpillsWithFileStream requires that there are at least two spills to merge.

mergeSpillsWithFileStream is used when UnsafeShuffleWriter is requested to merge spills.

Getting Peak Memory Used

long getPeakMemoryUsedBytes()

getPeakMemoryUsedBytes simply updatePeakMemoryUsed and returns the internal peakMemoryUsedBytes registry.

getPeakMemoryUsedBytes is used when UnsafeShuffleWriter is requested to stop.

Opening UnsafeShuffleWriter and Buffers

void open()

open requires that there is no ShuffleExternalSorter available.

open creates a ShuffleExternalSorter.

open creates a serialized buffer with the capacity of 1M.

open requests the SerializerInstance for a SerializationStream to the serBuffer (available internally as the serOutputStream reference).

open is used when UnsafeShuffleWriter is created.

Logging

Enable ALL logging level for org.apache.spark.shuffle.sort.UnsafeShuffleWriter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.UnsafeShuffleWriter=ALL

Refer to Logging.

Internal Properties

mapStatus

Created when UnsafeShuffleWriter is requested to close internal resources and merge spill files (with the BlockManagerId of the BlockManager and partitionLengths)

Returned when UnsafeShuffleWriter is requested to stop

partitioner

Used when UnsafeShuffleWriter is requested for the following:

peakMemoryUsedBytes

Peak memory used (in bytes) that is updated exclusively in updatePeakMemoryUsed (after requesting the ShuffleExternalSorter for getPeakMemoryUsedBytes)

Use getPeakMemoryUsedBytes to access the current value

ByteArrayOutputStream for Serialized Data

java.io.ByteArrayOutputStream of serialized data (written into a byte array of 1MB initial size)

Used when UnsafeShuffleWriter is requested for the following:

Destroyed (null) when requested to close internal resources and merge spill files.

serializer

SerializerInstance (that is a new instance of the Serializer of the ShuffleDependency of the SerializedShuffleHandle)

Used exclusively when UnsafeShuffleWriter is requested to open (and creates the SerializationStream)

serOutputStream

SerializationStream (that is created when the SerializerInstance is requested to serializeStream with the ByteArrayOutputStream)

Used when UnsafeShuffleWriter is requested to insertRecordIntoSorter

Destroyed (null) when requested to close internal resources and merge spill files.

shuffleId

Used exclusively when requested to close internal resources and merge spill files

ShuffleExternalSorter

UnsafeShuffleWriter uses a ShuffleExternalSorter.

ShuffleExternalSorter is created when UnsafeShuffleWriter is requested to open (while being created) and dereferenced (nulled) when requested to close internal resources and merge spill files.

Used when UnsafeShuffleWriter is requested for the following:

writeMetrics

Used when UnsafeShuffleWriter is requested for the following: