SortShuffleWriter

SortShuffleWriter is a concrete ShuffleWriter that is used when SortShuffleManager returns a ShuffleWriter for ShuffleHandle (and the more specialized BypassMergeSortShuffleWriter and UnsafeShuffleWriter could not be used).

SortShuffleWriter[K, V, C] is a parameterized type with K keys, V values, and C combiner values.

Creating Instance

SortShuffleWriter takes the following to be created:

MapStatus

SortShuffleWriter uses an internal variable for a MapStatus after writing records.

Writing records itself does not return a value, SortShuffleWriter uses the variable when requested to stop (which allows returning a MapStatus).

Writing Records (Into Shuffle Partitioned File In Disk Store)

write(
  records: Iterator[Product2[K, V]]): Unit

write creates an ExternalSorter based on the ShuffleDependency (of the BaseShuffleHandle), namely the Map-Size Partial Aggregation flag. The ExternalSorter uses the aggregator and keyOrdering when the flag is enabled.

write requests the ExternalSorter to inserts all the records.

write requests the IndexShuffleBlockResolver for a shuffle output file (for the shuffle and the map IDs) and creates a temporary file alongside the shuffle output file (in the same directory).

write creates a ShuffleBlockId (for the shuffle and the map IDs).

write requests ExternalSorter to write all the records (previously inserted in) into the temporary partitioned file in the disk store. ExternalSorter returns the length of every partition.

write requests IndexShuffleBlockResolver to write an index file and commit (with the shuffle and the map IDs, the temporary shuffle output file).

write creates a MapStatus (with the location of the shuffle server that serves the shuffle files and the sizes of the shuffle partitions). The MapStatus is later available as the mapStatus internal attribute.

write does not handle exceptions so when they occur, they will break the processing.

In the end, write deletes the temporary shuffle output file. write prints out the following ERROR message to the logs if the file count not be deleted:

Error while deleting temp file [path]

write is part of the ShuffleWriter abstraction.

Closing SortShuffleWriter (and Calculating MapStatus)

stop(
  success: Boolean): Option[MapStatus]

stop turns stopping flag on and returns the internal mapStatus if the input success is enabled.

Otherwise, when stopping flag is already enabled or the input success is disabled, stop returns no MapStatus (i.e. None).

In the end, stop requests the ExternalSorter to stop and increments the shuffle write time task metrics.

stop is part of the ShuffleWriter abstraction.

Requirements of BypassMergeSortShuffleHandle (as ShuffleHandle)

shouldBypassMergeSort(
  conf: SparkConf,
  dep: ShuffleDependency[_, _, _]): Boolean

shouldBypassMergeSort returns true when all of the following hold:

  1. No map-side aggregation (the mapSideCombine flag of the given ShuffleDependency is off)

  2. Number of partitions (of the Partitioner of the given ShuffleDependency) is not greater than spark.shuffle.sort.bypassMergeThreshold configuration property

Otherwise, shouldBypassMergeSort does not hold (i.e. false).

shouldBypassMergeSort is used when SortShuffleManager is requested to register a shuffle (and creates a ShuffleHandle).

Logging

Enable ALL logging level for org.apache.spark.shuffle.sort.SortShuffleWriter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.SortShuffleWriter=ALL

Refer to Logging.

Internal Properties

Name Description

stopping

Internal flag to mark that SortShuffleWriter is closed.