Skip to content

SortShuffleWriter — Fallback ShuffleWriter

SortShuffleWriter is a "fallback" ShuffleWriter (when SortShuffleManager is requested for a ShuffleWriter and the more specialized BypassMergeSortShuffleWriter and UnsafeShuffleWriter could not be used).

SortShuffleWriter[K, V, C] is a parameterized type with K keys, V values, and C combiner values.

Creating Instance

SortShuffleWriter takes the following to be created:

SortShuffleWriter is created when:

MapStatus

SortShuffleWriter uses mapStatus internal registry for a MapStatus after writing records.

Writing records itself does not return a value and SortShuffleWriter uses the registry when requested to stop (which allows returning a MapStatus).

Writing Records (Into Shuffle Partitioned File In Disk Store)

write(
  records: Iterator[Product2[K, V]]): Unit

write is part of the ShuffleWriter abstraction.

write creates an ExternalSorter based on the ShuffleDependency (of the BaseShuffleHandle), namely the Map-Size Partial Aggregation flag. The ExternalSorter uses the aggregator and key ordering when the flag is enabled.

write requests the ExternalSorter to insert all the given records.

write...FIXME

Stopping SortShuffleWriter (and Calculating MapStatus)

stop(
  success: Boolean): Option[MapStatus]

stop is part of the ShuffleWriter abstraction.

stop turns the stopping flag on and returns the internal mapStatus if the input success is enabled.

Otherwise, when stopping flag is already enabled or the input success is disabled, stop returns no MapStatus (i.e. None).

In the end, stop requests the ExternalSorter to stop and increments the shuffle write time task metrics.

Requirements of BypassMergeSortShuffleHandle (as ShuffleHandle)

shouldBypassMergeSort(
  conf: SparkConf,
  dep: ShuffleDependency[_, _, _]): Boolean

shouldBypassMergeSort returns true when all of the following hold:

  1. No map-side aggregation (the mapSideCombine flag of the given ShuffleDependency is off)

  2. Number of partitions (of the Partitioner of the given ShuffleDependency) is not greater than spark.shuffle.sort.bypassMergeThreshold configuration property

Otherwise, shouldBypassMergeSort does not hold (false).

shouldBypassMergeSort is used when:

stopping Flag

SortShuffleWriter uses stopping internal flag to indicate whether or not this SortShuffleWriter has been stopped.

Logging

Enable ALL logging level for org.apache.spark.shuffle.sort.SortShuffleWriter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.SortShuffleWriter=ALL

Refer to Logging.