SortShuffleManager

SortShuffleManager is the default and only known ShuffleManager in Apache Spark (with the short name sort or tungsten-sort).

Use spark.shuffle.manager configuration property to activate custom ShuffleManager.

SortShuffleManager is created when SparkEnv is created (on the driver and executors at the very beginning of a Spark application’s lifecycle).

SortShuffleManager
Figure 1. SortShuffleManager and SparkEnv (Driver and Executors)

Creating Instance

SortShuffleManager takes a single SparkConf to be created.

Maximum Number of Partition Identifiers

SortShuffleManager allows for (1 << 24) partition identifiers that can be encoded (i.e. 16777216).

numMapsForShuffle

Lookup table with the number of mappers producing the output for a shuffle (as java.util.concurrent.ConcurrentHashMap)

IndexShuffleBlockResolver

shuffleBlockResolver: ShuffleBlockResolver

shuffleBlockResolver is an IndexShuffleBlockResolver that is created immediately when SortShuffleManager is.

shuffleBlockResolver is used when SortShuffleManager is requested for a ShuffleWriter for a given partition, to unregister a shuffle metadata and stop.

shuffleBlockResolver is part of the ShuffleManager abstraction.

Getting ShuffleWriter For Partition and ShuffleHandle

getWriter[K, V](
  handle: ShuffleHandle,
  mapId: Int,
  context: TaskContext): ShuffleWriter[K, V]

getWriter registers the given ShuffleHandle (by the shuffleId and numMaps) in the numMapsForShuffle internal registry unless already done.

getWriter expects that the input ShuffleHandle is of type BaseShuffleHandle. Moreover, getWriter further expects that in two (out of three cases) it is a more specialized IndexShuffleBlockResolver.

getWriter then creates a new ShuffleWriter based on the type of the given ShuffleHandle.

ShuffleHandle ShuffleWriter

SerializedShuffleHandle

UnsafeShuffleWriter

BypassMergeSortShuffleHandle

BypassMergeSortShuffleWriter

BaseShuffleHandle

SortShuffleWriter

getWriter is part of the ShuffleManager abstraction.

Unregistering Shuffle

unregisterShuffle(
  shuffleId: Int): Boolean

unregisterShuffle tries to remove the given shuffleId from the numMapsForShuffle internal registry.

If the given shuffleId was registered, unregisterShuffle requests the IndexShuffleBlockResolver to remove the shuffle index and data files one by one (up to the number of mappers producing the output for the shuffle).

unregisterShuffle is part of the ShuffleManager abstraction.

Creating ShuffleHandle (For ShuffleDependency)

registerShuffle[K, V, C](
  shuffleId: Int,
  numMaps: Int,
  dependency: ShuffleDependency[K, V, C]): ShuffleHandle
FIXME Copy the conditions

registerShuffle returns a new ShuffleHandle that can be one of the following:

  1. BypassMergeSortShuffleHandle (with ShuffleDependency[K, V, V]) when shouldBypassMergeSort condition holds.

  2. SerializedShuffleHandle (with ShuffleDependency[K, V, V]) when canUseSerializedShuffle condition holds.

  3. BaseShuffleHandle

registerShuffle is part of the ShuffleManager abstraction.

Creating BlockStoreShuffleReader For ShuffleHandle And Reduce Partitions

getReader[K, C](
  handle: ShuffleHandle,
  startPartition: Int,
  endPartition: Int,
  context: TaskContext): ShuffleReader[K, C]

getReader returns a new BlockStoreShuffleReader passing all the input parameters on to it.

getReader assumes that the input ShuffleHandle is of type BaseShuffleHandle.

getReader is part of the ShuffleManager abstraction.

Stopping SortShuffleManager

stop(): Unit

stop simply requests the IndexShuffleBlockResolver to stop (which actually does nothing).

stop is part of the ShuffleManager abstraction.

Requirements of SerializedShuffleHandle (as ShuffleHandle)

canUseSerializedShuffle(
  dependency: ShuffleDependency[_, _, _]): Boolean

canUseSerializedShuffle returns true when all of the following hold:

  1. Serializer (of the given ShuffleDependency) supports relocation of serialized objects

  2. No map-side aggregation (the mapSideCombine flag of the given ShuffleDependency is off)

  3. Number of partitions (of the Partitioner of the given ShuffleDependency) is not greater than the supported maximum number (i.e. (1 << 24) - 1, i.e. 16777215)

canUseSerializedShuffle prints out the following DEBUG message to the logs:

Can use serialized shuffle for shuffle [shuffleId]

Otherwise, canUseSerializedShuffle does not hold and prints out one of the following DEBUG messages:

Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation

Can't use serialized shuffle for shuffle [id] because an aggregator is defined

Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions

shouldBypassMergeSort is used when SortShuffleManager is requested to register a shuffle (and creates a ShuffleHandle).

Logging

Enable ALL logging level for org.apache.spark.shuffle.sort.SortShuffleManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.SortShuffleManager=ALL

Refer to Logging.