Skip to content

SortShuffleManager

SortShuffleManager is the default and only ShuffleManager in Apache Spark (with the short name sort or tungsten-sort).

SortShuffleManager and SparkEnv (Driver and Executors)

Creating Instance

SortShuffleManager takes the following to be created:

SortShuffleManager is created when SparkEnv is created (on the driver and executors at the very beginning of a Spark application's lifecycle).

Getting ShuffleWriter For Partition and ShuffleHandle

getWriter[K, V](
  handle: ShuffleHandle,
  mapId: Int,
  context: TaskContext): ShuffleWriter[K, V]

getWriter registers the given ShuffleHandle (by the shuffleId and numMaps) in the numMapsForShuffle internal registry unless already done.

Note

getWriter expects that the input ShuffleHandle is a BaseShuffleHandle. Moreover, getWriter expects that in two (out of three cases) it is a more specialized IndexShuffleBlockResolver.

getWriter then creates a new ShuffleWriter based on the type of the given ShuffleHandle.

ShuffleHandle ShuffleWriter
SerializedShuffleHandle UnsafeShuffleWriter
BypassMergeSortShuffleHandle BypassMergeSortShuffleWriter
BaseShuffleHandle SortShuffleWriter

getWriter is part of the ShuffleManager abstraction.

== [[MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE]] Maximum Number of Partition Identifiers

SortShuffleManager allows for (1 << 24) partition identifiers that can be encoded (i.e. 16777216).

== [[numMapsForShuffle]] numMapsForShuffle

Lookup table with the number of mappers producing the output for a shuffle (as {java-javadoc-url}/java/util/concurrent/ConcurrentHashMap.html[java.util.concurrent.ConcurrentHashMap])

== [[shuffleBlockResolver]] IndexShuffleBlockResolver

[source, scala]

shuffleBlockResolver: ShuffleBlockResolver

shuffleBlockResolver is an shuffle:IndexShuffleBlockResolver.md[IndexShuffleBlockResolver] that is created immediately when SortShuffleManager is.

shuffleBlockResolver is used when SortShuffleManager is requested for a <>, to <> and <>.

shuffleBlockResolver is part of the shuffle:ShuffleManager.md#shuffleBlockResolver[ShuffleManager] abstraction.

== [[unregisterShuffle]] Unregistering Shuffle

[source, scala]

unregisterShuffle( shuffleId: Int): Boolean


unregisterShuffle tries to remove the given shuffleId from the <> internal registry.

If the given shuffleId was registered, unregisterShuffle requests the <> to <> one by one (up to the number of mappers producing the output for the shuffle).

unregisterShuffle is part of the shuffle:ShuffleManager.md#unregisterShuffle[ShuffleManager] abstraction.

== [[registerShuffle]] Creating ShuffleHandle (For ShuffleDependency)

[source, scala]

registerShuffleK, V, C: ShuffleHandle


CAUTION: FIXME Copy the conditions

registerShuffle returns a new ShuffleHandle that can be one of the following:

  1. shuffle:BypassMergeSortShuffleHandle.md[BypassMergeSortShuffleHandle] (with ShuffleDependency[K, V, V]) when shuffle:SortShuffleWriter.md#shouldBypassMergeSort[shouldBypassMergeSort] condition holds.

  2. shuffle:SerializedShuffleHandle.md[SerializedShuffleHandle] (with ShuffleDependency[K, V, V]) when <>.

  3. shuffle:spark-shuffle-BaseShuffleHandle.md[BaseShuffleHandle]

registerShuffle is part of the shuffle:ShuffleManager.md#registerShuffle[ShuffleManager] abstraction.

== [[getReader]] Creating BlockStoreShuffleReader For ShuffleHandle And Reduce Partitions

[source, scala]

getReaderK, C: ShuffleReader[K, C]


getReader returns a new shuffle:BlockStoreShuffleReader.md[BlockStoreShuffleReader] passing all the input parameters on to it.

getReader assumes that the input ShuffleHandle is of type shuffle:spark-shuffle-BaseShuffleHandle.md[BaseShuffleHandle].

getReader is part of the shuffle:ShuffleManager.md#getReader[ShuffleManager] abstraction.

== [[stop]] Stopping SortShuffleManager

[source, scala]

stop(): Unit

stop simply requests the <> to shuffle:IndexShuffleBlockResolver.md#stop[stop] (which actually does nothing).

stop is part of the shuffle:ShuffleManager.md#stop[ShuffleManager] abstraction.

== [[canUseSerializedShuffle]] Requirements of SerializedShuffleHandle (as ShuffleHandle)

[source, scala]

canUseSerializedShuffle( dependency: ShuffleDependency[_, _, _]): Boolean


canUseSerializedShuffle returns true when all of the following hold:

. Serializer (of the given ShuffleDependency) serializer:Serializer.md#supportsRelocationOfSerializedObjects[supports relocation of serialized objects]

. No map-side aggregation (the mapSideCombine flag of the given ShuffleDependency is off)

. Number of partitions (of the Partitioner of the given ShuffleDependency) is not greater than the <> (i.e. (1 << 24) - 1, i.e. 16777215)

canUseSerializedShuffle prints out the following DEBUG message to the logs:

[source,plaintext]

Can use serialized shuffle for shuffle [shuffleId]

Otherwise, canUseSerializedShuffle does not hold and prints out one of the following DEBUG messages:

[source,plaintext]

Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation

Can't use serialized shuffle for shuffle [id] because an aggregator is defined

Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions

shouldBypassMergeSort is used when SortShuffleManager is requested to shuffle:SortShuffleManager.md#registerShuffle[register a shuffle (and creates a ShuffleHandle)].

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.shuffle.sort.SortShuffleManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

[source,plaintext]

log4j.logger.org.apache.spark.shuffle.sort.SortShuffleManager=ALL

Refer to ROOT:spark-logging.md[Logging].


Last update: 2020-10-09