SortShuffleWriter is a concrete ShuffleWriter that is used when SortShuffleManager.md#getWriter[
SortShuffleManager returns a
ShuffleHandle] (and the more specialized BypassMergeSortShuffleWriter.md[BypassMergeSortShuffleWriter] and UnsafeShuffleWriter.md[UnsafeShuffleWriter] could not be used).
SortShuffleWriter is created when SortShuffleManager.md#getWriter[
SortShuffleManager returns a
ShuffleWriter for the fallback
SortShuffleWriter[K, V, C] is a parameterized type with
V values, and
C combiner values.
== [[creating-instance]] Creating Instance
SortShuffleWriter takes the following to be created:
- [[shuffleBlockResolver]] IndexShuffleBlockResolver.md[IndexShuffleBlockResolver]
- [[handle]] spark-shuffle-BaseShuffleHandle.md[BaseShuffleHandle]
- [[mapId]] Map ID
- [[context]] scheduler:spark-TaskContext.md[TaskContext]
== [[mapStatus]] MapStatus
SortShuffleWriter uses an internal variable for a scheduler:MapStatus.md[MapStatus] after <
== [[write]] Writing Records (Into Shuffle Partitioned File In Disk Store)
write( records: Iterator[Product2[K, V]]): Unit
write creates an ExternalSorter.md[ExternalSorter] based on the spark-shuffle-BaseShuffleHandle.md#dependency[ShuffleDependency] (of the <
write requests the ExternalSorter to inserts all the records.
write requests the <
write creates a storage:BlockId.md#ShuffleBlockId[ShuffleBlockId] (for the shuffle and the <
write requests ExternalSorter to ExternalSorter.md#writePartitionedFile[write all the records (previously inserted in) into the temporary partitioned file in the disk store]. ExternalSorter returns the length of every partition.
write requests <
write creates a scheduler:MapStatus.md[MapStatus] (with the storage:BlockManager.md#shuffleServerId[location of the shuffle server] that serves the shuffle files and the sizes of the shuffle partitions). The
MapStatus is later available as the <
write does not handle exceptions so when they occur, they will break the processing.
In the end, write deletes the temporary shuffle output file. write prints out the following ERROR message to the logs if the file count not be deleted:
Error while deleting temp file [path]
write is part of the ShuffleWriter.md#write[ShuffleWriter] abstraction.
== [[stop]] Closing SortShuffleWriter (and Calculating MapStatus)
stop( success: Boolean): Option[MapStatus]
stop turns <
success is enabled.
Otherwise, when <
success is disabled, stop returns no
In the end, stop requests the ExternalSorter to ExternalSorter.md#stop[stop] and increments the shuffle write time task metrics.
stop is part of the ShuffleWriter.md#contract[ShuffleWriter] abstraction.
== [[shouldBypassMergeSort]] Requirements of BypassMergeSortShuffleHandle (as ShuffleHandle)
shouldBypassMergeSort( conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean
true when all of the following hold:
shouldBypassMergeSort does not hold (i.e.
shouldBypassMergeSort is used when
SortShuffleManager is requested to register a shuffle (and creates a ShuffleHandle).
== [[logging]] Logging
ALL logging level for
org.apache.spark.shuffle.sort.SortShuffleWriter logger to see what happens inside.
Add the following line to
Refer to ROOT:spark-logging.md[Logging].
== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
stopping | Internal flag to mark that <