Skip to content

SortShuffleManager

SortShuffleManager is the default and only ShuffleManager in Apache Spark (with the short name sort or tungsten-sort).

SortShuffleManager and SparkEnv (Driver and Executors)

Creating Instance

SortShuffleManager takes the following to be created:

SortShuffleManager is created when SparkEnv is created (on the driver and executors at the very beginning of a Spark application's lifecycle).

taskIdMapsForShuffle Registry

taskIdMapsForShuffle: ConcurrentHashMap[Int, OpenHashSet[Long]]

SortShuffleManager uses taskIdMapsForShuffle internal registry to track task (attempt) IDs by shuffle.

A new shuffle and task IDs are added when SortShuffleManager is requested for a ShuffleWriter (for a partition and a ShuffleHandle).

A shuffle ID (and associated task IDs) are removed when SortShuffleManager is requested to unregister a shuffle.

Getting ShuffleWriter for Partition and ShuffleHandle

getWriter[K, V](
  handle: ShuffleHandle,
  mapId: Int,
  context: TaskContext): ShuffleWriter[K, V]

getWriter registers the given ShuffleHandle (by the shuffleId and numMaps) in the taskIdMapsForShuffle internal registry unless already done.

Note

getWriter expects that the input ShuffleHandle is a BaseShuffleHandle. Moreover, getWriter expects that in two (out of three cases) it is a more specialized IndexShuffleBlockResolver.

getWriter then creates a new ShuffleWriter based on the type of the given ShuffleHandle.

ShuffleHandle ShuffleWriter
SerializedShuffleHandle UnsafeShuffleWriter
BypassMergeSortShuffleHandle BypassMergeSortShuffleWriter
BaseShuffleHandle SortShuffleWriter

getWriter is part of the ShuffleManager abstraction.

ShuffleExecutorComponents

shuffleExecutorComponents: ShuffleExecutorComponents

SortShuffleManager defines the shuffleExecutorComponents internal registry for a ShuffleExecutorComponents.

shuffleExecutorComponents is used when:

loadShuffleExecutorComponents

loadShuffleExecutorComponents(
  conf: SparkConf): ShuffleExecutorComponents

loadShuffleExecutorComponents loads the ShuffleDataIO that is then requested for the ShuffleExecutorComponents.

loadShuffleExecutorComponents requests the ShuffleExecutorComponents to initialize before returning it.

Creating ShuffleHandle for ShuffleDependency

registerShuffle[K, V, C](
  shuffleId: Int,
  dependency: ShuffleDependency[K, V, C]): ShuffleHandle

registerShuffle is part of the ShuffleManager abstraction.

registerShuffle creates a new ShuffleHandle (for the given ShuffleDependency) that is one of the following:

  1. BypassMergeSortShuffleHandle (with ShuffleDependency[K, V, V]) when shouldBypassMergeSort condition holds

  2. SerializedShuffleHandle (with ShuffleDependency[K, V, V]) when canUseSerializedShuffle condition holds

  3. BaseShuffleHandle

SerializedShuffleHandle Requirements

canUseSerializedShuffle(
  dependency: ShuffleDependency[_, _, _]): Boolean

canUseSerializedShuffle is true when all of the following hold for the given ShuffleDependency:

  1. Serializer (of the given ShuffleDependency) supports relocation of serialized objects

  2. mapSideCombine flag (of the given ShuffleDependency) is false

  3. Number of partitions (of the Partitioner of the given ShuffleDependency) is not greater than the supported maximum number

With all of the above positive, canUseSerializedShuffle prints out the following DEBUG message to the logs:

Can use serialized shuffle for shuffle [shuffleId]

Otherwise, canUseSerializedShuffle is false and prints out one of the following DEBUG messages based on the failed requirement:

Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation
Can't use serialized shuffle for shuffle [id] because we need to do map-side aggregation
Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions

canUseSerializedShuffle is used when:

Maximum Number of Partition Identifiers for Serialized Mode

SortShuffleManager defines MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE internal constant to be (1 << 24) (16777216) for the maximum number of shuffle output partitions.

MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE is used when:

Creating ShuffleBlockResolver

shuffleBlockResolver: IndexShuffleBlockResolver

shuffleBlockResolver is part of the ShuffleManager abstraction.

shuffleBlockResolver is a IndexShuffleBlockResolver (and is created immediately alongside this SortShuffleManager).

Unregistering Shuffle

unregisterShuffle(
  shuffleId: Int): Boolean

unregisterShuffle is part of the ShuffleManager abstraction.

unregisterShuffle removes the given shuffleId from the taskIdMapsForShuffle internal registry.

If the shuffleId was found and removed successfully, unregisterShuffle requests the IndexShuffleBlockResolver to remove the shuffle index and data files for every mapTaskId (mappers producing the output for the shuffle).

unregisterShuffle is always true.

Getting ShuffleReader for ShuffleHandle

getReader[K, C](
  handle: ShuffleHandle,
  startMapIndex: Int,
  endMapIndex: Int,
  startPartition: Int,
  endPartition: Int,
  context: TaskContext,
  metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]

getReader is part of the ShuffleManager abstraction.

getReader requests the MapOutputTracker (via SparkEnv) for the getMapSizesByExecutorId for the shuffleId (of the given ShuffleHandle).

In the end, getReader creates a new BlockStoreShuffleReader.

Stopping ShuffleManager

stop(): Unit

stop is part of the ShuffleManager abstraction.

stop requests the IndexShuffleBlockResolver to stop.

Logging

Enable ALL logging level for org.apache.spark.shuffle.sort.SortShuffleManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.SortShuffleManager=ALL

Refer to Logging.