SortShuffleWriter — Fallback ShuffleWriter¶
SortShuffleWriter is a "fallback" ShuffleWriter (when SortShuffleManager is requested for a ShuffleWriter and the more specialized BypassMergeSortShuffleWriter and UnsafeShuffleWriter could not be used).
SortShuffleWriter[K, V, C] is a parameterized type with K keys, V values, and C combiner values.
Creating Instance¶
SortShuffleWriter takes the following to be created:
SortShuffleWriter is created when:
SortShuffleManageris requested for a ShuffleWriter (for a given ShuffleHandle)
MapStatus¶
SortShuffleWriter uses mapStatus internal registry for a MapStatus after writing records.
Writing records itself does not return a value and SortShuffleWriter uses the registry when requested to stop (which allows returning a MapStatus).
Writing Records (Into Shuffle Partitioned File In Disk Store)¶
write(
records: Iterator[Product2[K, V]]): Unit
write is part of the ShuffleWriter abstraction.
write creates an ExternalSorter based on the ShuffleDependency (of the BaseShuffleHandle), namely the Map-Size Partial Aggregation flag. The ExternalSorter uses the aggregator and key ordering when the flag is enabled.
write requests the ExternalSorter to insert all the given records.
write...FIXME
Stopping SortShuffleWriter (and Calculating MapStatus)¶
stop(
success: Boolean): Option[MapStatus]
stop is part of the ShuffleWriter abstraction.
stop turns the stopping flag on and returns the internal mapStatus if the input success is enabled.
Otherwise, when stopping flag is already enabled or the input success is disabled, stop returns no MapStatus (i.e. None).
In the end, stop requests the ExternalSorter to stop and increments the shuffle write time task metrics.
Requirements of BypassMergeSortShuffleHandle (as ShuffleHandle)¶
shouldBypassMergeSort(
conf: SparkConf,
dep: ShuffleDependency[_, _, _]): Boolean
shouldBypassMergeSort returns true when all of the following hold:
-
No map-side aggregation (the mapSideCombine flag of the given ShuffleDependency is off)
-
Number of partitions (of the Partitioner of the given ShuffleDependency) is not greater than spark.shuffle.sort.bypassMergeThreshold configuration property
Otherwise, shouldBypassMergeSort does not hold (false).
shouldBypassMergeSort is used when:
SortShuffleManageris requested to register a shuffle (and creates a ShuffleHandle)
stopping Flag¶
SortShuffleWriter uses stopping internal flag to indicate whether or not this SortShuffleWriter has been stopped.
Logging¶
Enable ALL logging level for org.apache.spark.shuffle.sort.SortShuffleWriter logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.shuffle.sort.SortShuffleWriter=ALL
Refer to Logging.