UnsafeShuffleWriter¶
UnsafeShuffleWriter<K, V>
is a ShuffleWriter for SerializedShuffleHandles.
UnsafeShuffleWriter
opens resources (a ShuffleExternalSorter and the buffers) while being created.
Creating Instance¶
UnsafeShuffleWriter
takes the following to be created:
- BlockManager
- TaskMemoryManager
- SerializedShuffleHandle
- Map ID
- TaskContext
- SparkConf
- ShuffleWriteMetricsReporter
-
ShuffleExecutorComponents
UnsafeShuffleWriter
is created when SortShuffleManager
is requested for a ShuffleWriter for a SerializedShuffleHandle.
UnsafeShuffleWriter
makes sure that the number of partitions at most 16MB reduce partitions (1 << 24
) (as the upper bound of the partition identifiers that can be encoded) or throws an IllegalArgumentException
:
UnsafeShuffleWriter can only be used for shuffles with at most 16777215 reduce partitions
UnsafeShuffleWriter
uses the number of partitions of the Partitioner that is used for the ShuffleDependency of the SerializedShuffleHandle.
Note
The number of shuffle output partitions is first enforced when SortShuffleManager
is requested to check out whether a SerializedShuffleHandle can be used for ShuffleHandle (that eventually leads to UnsafeShuffleWriter
).
In the end, UnsafeShuffleWriter
creates a ShuffleExternalSorter and a SerializationStream.
ShuffleExternalSorter¶
UnsafeShuffleWriter
uses a ShuffleExternalSorter.
ShuffleExternalSorter
is created when UnsafeShuffleWriter
is requested to open (while being created) and dereferenced (null
ed) when requested to close internal resources and merge spill files.
Used when UnsafeShuffleWriter
is requested for the following:
- Updating peak memory used
- Writing records
- Closing internal resources and merging spill files
- Inserting a record
- Stopping
IndexShuffleBlockResolver¶
UnsafeShuffleWriter
is given a IndexShuffleBlockResolver when created.
UnsafeShuffleWriter
uses the IndexShuffleBlockResolver for...FIXME
Initial Serialized Buffer Size¶
UnsafeShuffleWriter
uses a fixed buffer size for the output stream of serialized data written into a byte array (default: 1024 * 1024
).
inputBufferSizeInBytes¶
UnsafeShuffleWriter
uses the spark.shuffle.file.buffer configuration property for...FIXME
outputBufferSizeInBytes¶
UnsafeShuffleWriter
uses the spark.shuffle.unsafe.file.output.buffer configuration property for...FIXME
transferToEnabled¶
UnsafeShuffleWriter
can use a specialized NIO-based fast merge procedure that avoids extra serialization/deserialization when spark.file.transferTo configuration property is enabled.
initialSortBufferSize¶
UnsafeShuffleWriter
uses the initial buffer size for sorting (default: 4096
) when creating a ShuffleExternalSorter (when requested to open).
Tip
Use spark.shuffle.sort.initialBufferSize configuration property to change the buffer size.
Merging Spills¶
long[] mergeSpills(
SpillInfo[] spills,
File outputFile)
Many Spills¶
With multiple SpillInfos
to merge, mergeSpills
selects between fast and slow merge strategies. The fast merge strategy can be transferTo- or fileStream-based.
mergeSpills uses the spark.shuffle.unsafe.fastMergeEnabled configuration property to consider one of the fast merge strategies.
A fast merge strategy is supported when spark.shuffle.compress configuration property is disabled or the IO compression codec supports decompression of concatenated compressed streams.
With spark.shuffle.compress configuration property enabled, mergeSpills
will always use the slow merge strategy.
With fast merge strategy enabled and supported, transferToEnabled enabled and encryption disabled, mergeSpills
prints out the following DEBUG message to the logs and mergeSpillsWithTransferTo.
Using transferTo-based fast merge
With fast merge strategy enabled and supported, no transferToEnabled or encryption enabled, mergeSpills
prints out the following DEBUG message to the logs and mergeSpillsWithFileStream (with no compression codec).
Using fileStream-based fast merge
For slow merge, mergeSpills
prints out the following DEBUG message to the logs and mergeSpillsWithFileStream (with the compression codec).
Using slow merge
In the end, mergeSpills
requests the ShuffleWriteMetrics to decBytesWritten and incBytesWritten, and returns the partition length array.
One Spill¶
With one SpillInfo
to merge, mergeSpills
simply renames the spill file to be the output file and returns the partition length array of the one spill.
No Spills¶
With no SpillInfo
s to merge, mergeSpills
creates an empty output file and returns an array of 0
s of size of the numPartitions of the Partitioner.
Usage¶
mergeSpills
is used when UnsafeShuffleWriter
is requested to close internal resources and merge spill files.
mergeSpillsWithTransferTo¶
long[] mergeSpillsWithTransferTo(
SpillInfo[] spills,
File outputFile)
mergeSpillsWithTransferTo
...FIXME
mergeSpillsWithTransferTo
is used when UnsafeShuffleWriter
is requested to mergeSpills (with the transferToEnabled flag enabled and no encryption).
== [[updatePeakMemoryUsed]] updatePeakMemoryUsed Internal Method
[source, java]¶
void updatePeakMemoryUsed()¶
updatePeakMemoryUsed...FIXME
updatePeakMemoryUsed is used when UnsafeShuffleWriter is requested for the <
Writing Key-Value Records of Partition¶
void write(
Iterator<Product2<K, V>> records)
write
traverses the input sequence of records (for a RDD partition) and insertRecordIntoSorter one by one. When all the records have been processed, write
closes internal resources and merges spill files.
In the end, write
requests ShuffleExternalSorter
to clean up.
CAUTION: FIXME
When requested to <
write
is part of the ShuffleWriter abstraction.
== [[stop]] Stopping ShuffleWriter
[source, java]¶
Option
stop
...FIXME
When requested to <
stop
is part of the ShuffleWriter abstraction.
== [[insertRecordIntoSorter]] Inserting Record Into ShuffleExternalSorter
[source, java]¶
void insertRecordIntoSorter( Product2
insertRecordIntoSorter requires that the <
insertRecordIntoSorter requests the <
insertRecordIntoSorter requests the <
[[insertRecordIntoSorter-serializedRecordSize]] insertRecordIntoSorter requests the <
[[insertRecordIntoSorter-partitionId]] insertRecordIntoSorter requests the <
In the end, insertRecordIntoSorter requests the <
insertRecordIntoSorter is used when UnsafeShuffleWriter is requested to <
Closing and Writing Output (Merging Spill Files)¶
void closeAndWriteOutput()
closeAndWriteOutput
asserts that the ShuffleExternalSorter is created (non-null
).
closeAndWriteOutput
updates peak memory used.
closeAndWriteOutput
removes the references to the <null
s them).
closeAndWriteOutput
requests the <
closeAndWriteOutput
removes the reference to the <null
s it).
closeAndWriteOutput
requests the <
[[closeAndWriteOutput-partitionLengths]][[closeAndWriteOutput-tmp]] closeAndWriteOutput creates a temporary file (along the data output file) and uses it to <
closeAndWriteOutput requests the <
In the end, closeAndWriteOutput creates a scheduler:MapStatus.md[MapStatus] with the storage:BlockManager.md#shuffleServerId[location of the local BlockManager] and the <
closeAndWriteOutput prints out the following ERROR message to the logs if there is an issue with deleting spill files:
Error while deleting spill file [path]
closeAndWriteOutput prints out the following ERROR message to the logs if there is an issue with deleting the <
Error while deleting temp file [path]
closeAndWriteOutput
is used when UnsafeShuffleWriter
is requested to write records.
== [[getPeakMemoryUsedBytes]] Getting Peak Memory Used
[source, java]¶
long getPeakMemoryUsedBytes()¶
getPeakMemoryUsedBytes simply <
getPeakMemoryUsedBytes is used when UnsafeShuffleWriter is requested to <
== [[open]] Opening UnsafeShuffleWriter and Buffers
[source, java]¶
void open()¶
open requires that there is no <
open creates a ShuffleExternalSorter.md[ShuffleExternalSorter].
open creates a <
open requests the <
open is used when UnsafeShuffleWriter is <
== [[logging]] Logging
Enable ALL
logging level for org.apache.spark.shuffle.sort.UnsafeShuffleWriter
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
[source,plaintext]¶
log4j.logger.org.apache.spark.shuffle.sort.UnsafeShuffleWriter=ALL¶
Refer to spark-logging.md[Logging].
Internal Properties¶
MapStatus¶
Created when UnsafeShuffleWriter is requested to <partitionLengths
)
Returned when UnsafeShuffleWriter is requested to <
Partitioner¶
Partitioner (as used by the BaseShuffleHandle.md#dependency[ShuffleDependency] of the <
Used when UnsafeShuffleWriter is requested for the following:
-
<
> (and create a ShuffleExternalSorter.md[ShuffleExternalSorter] with the given ../rdd/Partitioner.md#numPartitions[number of partitions]) -
<
> (and request the ../rdd/Partitioner.md#getPartition[partition for the key]) -
<
>, < > and < > (for the ../rdd/Partitioner.md#numPartitions[number of partitions] to create partition lengths)
Peak Memory Used¶
Peak memory used (in bytes) that is updated exclusively in <
Use <
ByteArrayOutputStream for Serialized Data¶
{java-javadoc-url}/java/io/ByteArrayOutputStream.html[java.io.ByteArrayOutputStream] of serialized data (written into a byte array of <
Used when UnsafeShuffleWriter is requested for the following:
-
<
> (and create the internal < >) -
<
>
Destroyed (null
) when requested to <
=== [[serializer]] serializer
serializer:SerializerInstance.md[SerializerInstance] (that is a new instance of the Serializer of the BaseShuffleHandle.md#dependency[ShuffleDependency] of the <
Used exclusively when UnsafeShuffleWriter is requested to <
=== [[serOutputStream]] serOutputStream
serializer:SerializationStream.md[SerializationStream] (that is created when the <
Used when UnsafeShuffleWriter is requested to <
Destroyed (null
) when requested to <
Shuffle ID¶
Shuffle ID (of the ShuffleDependency of the SerializedShuffleHandle)
Used exclusively when requested to <
=== [[writeMetrics]] writeMetrics
executor:ShuffleWriteMetrics.md[] (of the TaskMetrics of the <
Used when UnsafeShuffleWriter is requested for the following:
-
<
> (and creates the < >) -
<
> -
<
> -
<
>