Skip to content

UnsafeShuffleWriter

UnsafeShuffleWriter<K, V> is a ShuffleWriter for SerializedShuffleHandles.

UnsafeShuffleWriter, SortShuffleManager and SerializedShuffleHandle

UnsafeShuffleWriter opens resources (a ShuffleExternalSorter and the buffers) while being created.

UnsafeShuffleWriter and ShuffleExternalSorter

Creating Instance

UnsafeShuffleWriter takes the following to be created:

UnsafeShuffleWriter is created when SortShuffleManager is requested for a ShuffleWriter for a SerializedShuffleHandle.

UnsafeShuffleWriter makes sure that the number of partitions at most 16MB reduce partitions (1 << 24) (as the upper bound of the partition identifiers that can be encoded) or throws an IllegalArgumentException:

UnsafeShuffleWriter can only be used for shuffles with at most 16777215 reduce partitions

UnsafeShuffleWriter uses the number of partitions of the Partitioner that is used for the ShuffleDependency of the SerializedShuffleHandle.

Note

The number of shuffle output partitions is first enforced when SortShuffleManager is requested to check out whether a SerializedShuffleHandle can be used for ShuffleHandle (that eventually leads to UnsafeShuffleWriter).

In the end, UnsafeShuffleWriter creates a ShuffleExternalSorter and a SerializationStream.

ShuffleExternalSorter

UnsafeShuffleWriter uses a ShuffleExternalSorter.

ShuffleExternalSorter is created when UnsafeShuffleWriter is requested to open (while being created) and dereferenced (nulled) when requested to close internal resources and merge spill files.

Used when UnsafeShuffleWriter is requested for the following:

IndexShuffleBlockResolver

UnsafeShuffleWriter is given a IndexShuffleBlockResolver when created.

UnsafeShuffleWriter uses the IndexShuffleBlockResolver for...FIXME

Initial Serialized Buffer Size

UnsafeShuffleWriter uses a fixed buffer size for the output stream of serialized data written into a byte array (default: 1024 * 1024).

inputBufferSizeInBytes

UnsafeShuffleWriter uses the spark.shuffle.file.buffer configuration property for...FIXME

outputBufferSizeInBytes

UnsafeShuffleWriter uses the spark.shuffle.unsafe.file.output.buffer configuration property for...FIXME

transferToEnabled

UnsafeShuffleWriter can use a specialized NIO-based fast merge procedure that avoids extra serialization/deserialization when spark.file.transferTo configuration property is enabled.

initialSortBufferSize

UnsafeShuffleWriter uses the initial buffer size for sorting (default: 4096) when creating a ShuffleExternalSorter (when requested to open).

Tip

Use spark.shuffle.sort.initialBufferSize configuration property to change the buffer size.

Merging Spills

long[] mergeSpills(
  SpillInfo[] spills,
  File outputFile)

Many Spills

With multiple SpillInfos to merge, mergeSpills selects between fast and slow merge strategies. The fast merge strategy can be transferTo- or fileStream-based.

mergeSpills uses the spark.shuffle.unsafe.fastMergeEnabled configuration property to consider one of the fast merge strategies.

A fast merge strategy is supported when spark.shuffle.compress configuration property is disabled or the IO compression codec supports decompression of concatenated compressed streams.

With spark.shuffle.compress configuration property enabled, mergeSpills will always use the slow merge strategy.

With fast merge strategy enabled and supported, transferToEnabled enabled and encryption disabled, mergeSpills prints out the following DEBUG message to the logs and mergeSpillsWithTransferTo.

Using transferTo-based fast merge

With fast merge strategy enabled and supported, no transferToEnabled or encryption enabled, mergeSpills prints out the following DEBUG message to the logs and mergeSpillsWithFileStream (with no compression codec).

Using fileStream-based fast merge

For slow merge, mergeSpills prints out the following DEBUG message to the logs and mergeSpillsWithFileStream (with the compression codec).

Using slow merge

In the end, mergeSpills requests the ShuffleWriteMetrics to decBytesWritten and incBytesWritten, and returns the partition length array.

One Spill

With one SpillInfo to merge, mergeSpills simply renames the spill file to be the output file and returns the partition length array of the one spill.

No Spills

With no SpillInfos to merge, mergeSpills creates an empty output file and returns an array of 0s of size of the numPartitions of the Partitioner.

Usage

mergeSpills is used when UnsafeShuffleWriter is requested to close internal resources and merge spill files.

mergeSpillsWithTransferTo

long[] mergeSpillsWithTransferTo(
  SpillInfo[] spills,
  File outputFile)

mergeSpillsWithTransferTo...FIXME

mergeSpillsWithTransferTo is used when UnsafeShuffleWriter is requested to mergeSpills (with the transferToEnabled flag enabled and no encryption).

== [[updatePeakMemoryUsed]] updatePeakMemoryUsed Internal Method

[source, java]

void updatePeakMemoryUsed()

updatePeakMemoryUsed...FIXME

updatePeakMemoryUsed is used when UnsafeShuffleWriter is requested for the <> and to <>.

Writing Key-Value Records of Partition

void write(
  Iterator<Product2<K, V>> records)

write traverses the input sequence of records (for a RDD partition) and insertRecordIntoSorter one by one. When all the records have been processed, write closes internal resources and merges spill files.

In the end, write requests ShuffleExternalSorter to clean up.

CAUTION: FIXME

When requested to <>, UnsafeShuffleWriter simply <> followed by <> (that, among other things, creates the <>).

write is part of the ShuffleWriter abstraction.

== [[stop]] Stopping ShuffleWriter

[source, java]

Option stop( boolean success)


stop...FIXME

When requested to <>, UnsafeShuffleWriter records the peak execution memory metric and returns the <> (that was created when requested to <>).

stop is part of the ShuffleWriter abstraction.

== [[insertRecordIntoSorter]] Inserting Record Into ShuffleExternalSorter

[source, java]

void insertRecordIntoSorter( Product2 record)


insertRecordIntoSorter requires that the <> is available.

insertRecordIntoSorter requests the <> to reset (so that all currently accumulated output in the output stream is discarded and reusing the already allocated buffer space).

insertRecordIntoSorter requests the <> to write out the record (write the serializer:SerializationStream.md#writeKey[key] and the serializer:SerializationStream.md#writeValue[value]) and to serializer:SerializationStream.md#flush[flush].

[[insertRecordIntoSorter-serializedRecordSize]] insertRecordIntoSorter requests the <> for the length of the buffer.

[[insertRecordIntoSorter-partitionId]] insertRecordIntoSorter requests the <> for the ../rdd/Partitioner.md#getPartition[partition] for the given record (by the key).

In the end, insertRecordIntoSorter requests the <> to ShuffleExternalSorter.md#insertRecord[insert] the <> as a byte array (with the <> and the <>).

insertRecordIntoSorter is used when UnsafeShuffleWriter is requested to <>.

Closing and Writing Output (Merging Spill Files)

void closeAndWriteOutput()

closeAndWriteOutput asserts that the ShuffleExternalSorter is created (non-null).

closeAndWriteOutput updates peak memory used.

closeAndWriteOutput removes the references to the <> and <> output streams (nulls them).

closeAndWriteOutput requests the <> to ShuffleExternalSorter.md#closeAndGetSpills[close and return spill metadata].

closeAndWriteOutput removes the reference to the <> (nulls it).

closeAndWriteOutput requests the <> for the IndexShuffleBlockResolver.md#getDataFile[output data file] for the <> and <> IDs.

[[closeAndWriteOutput-partitionLengths]][[closeAndWriteOutput-tmp]] closeAndWriteOutput creates a temporary file (along the data output file) and uses it to <> (that gives a partition length array). All spill files are then deleted.

closeAndWriteOutput requests the <> to IndexShuffleBlockResolver.md#writeIndexFileAndCommit[write shuffle index and data files] (for the <> and <> IDs, the <> and the <>).

In the end, closeAndWriteOutput creates a scheduler:MapStatus.md[MapStatus] with the storage:BlockManager.md#shuffleServerId[location of the local BlockManager] and the <>.

closeAndWriteOutput prints out the following ERROR message to the logs if there is an issue with deleting spill files:

Error while deleting spill file [path]

closeAndWriteOutput prints out the following ERROR message to the logs if there is an issue with deleting the <>:

Error while deleting temp file [path]

closeAndWriteOutput is used when UnsafeShuffleWriter is requested to write records.

== [[getPeakMemoryUsedBytes]] Getting Peak Memory Used

[source, java]

long getPeakMemoryUsedBytes()

getPeakMemoryUsedBytes simply <> and returns the internal <> registry.

getPeakMemoryUsedBytes is used when UnsafeShuffleWriter is requested to <>.

== [[open]] Opening UnsafeShuffleWriter and Buffers

[source, java]

void open()

open requires that there is no <> available.

open creates a ShuffleExternalSorter.md[ShuffleExternalSorter].

open creates a <> with the capacity of <>.

open requests the <> for a serializer:SerializerInstance.md#serializeStream[SerializationStream] to the <> (available internally as the <> reference).

open is used when UnsafeShuffleWriter is <>.

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.shuffle.sort.UnsafeShuffleWriter logger to see what happens inside.

Add the following line to conf/log4j.properties:

[source,plaintext]

log4j.logger.org.apache.spark.shuffle.sort.UnsafeShuffleWriter=ALL

Refer to spark-logging.md[Logging].

Internal Properties

MapStatus

MapStatus

Created when UnsafeShuffleWriter is requested to <> (with the storage:BlockManagerId.md[] of the <> and partitionLengths)

Returned when UnsafeShuffleWriter is requested to <>

Partitioner

Partitioner (as used by the BaseShuffleHandle.md#dependency[ShuffleDependency] of the <>)

Used when UnsafeShuffleWriter is requested for the following:

  • <> (and create a ShuffleExternalSorter.md[ShuffleExternalSorter] with the given ../rdd/Partitioner.md#numPartitions[number of partitions])

  • <> (and request the ../rdd/Partitioner.md#getPartition[partition for the key])

  • <>, <> and <> (for the ../rdd/Partitioner.md#numPartitions[number of partitions] to create partition lengths)

Peak Memory Used

Peak memory used (in bytes) that is updated exclusively in <> (after requesting the <> for ShuffleExternalSorter.md#getPeakMemoryUsedBytes[getPeakMemoryUsedBytes])

Use <> to access the current value

ByteArrayOutputStream for Serialized Data

{java-javadoc-url}/java/io/ByteArrayOutputStream.html[java.io.ByteArrayOutputStream] of serialized data (written into a byte array of <> initial size)

Used when UnsafeShuffleWriter is requested for the following:

  • <> (and create the internal <>)

  • <>

Destroyed (null) when requested to <>.

=== [[serializer]] serializer

serializer:SerializerInstance.md[SerializerInstance] (that is a new instance of the Serializer of the BaseShuffleHandle.md#dependency[ShuffleDependency] of the <>)

Used exclusively when UnsafeShuffleWriter is requested to <> (and creates the <>)

=== [[serOutputStream]] serOutputStream

serializer:SerializationStream.md[SerializationStream] (that is created when the <> is requested to serializer:SerializerInstance.md#serializeStream[serializeStream] with the <>)

Used when UnsafeShuffleWriter is requested to <>

Destroyed (null) when requested to <>.

Shuffle ID

Shuffle ID (of the ShuffleDependency of the SerializedShuffleHandle)

Used exclusively when requested to <>

=== [[writeMetrics]] writeMetrics

executor:ShuffleWriteMetrics.md[] (of the TaskMetrics of the <>)

Used when UnsafeShuffleWriter is requested for the following:

  • <> (and creates the <>)

  • <>

  • <>

  • <>