SortShuffleManager¶
SortShuffleManager is the default and only ShuffleManager in Apache Spark (with the short name sort or tungsten-sort).

Creating Instance¶
SortShuffleManager takes the following to be created:
SortShuffleManager is created when SparkEnv is created (on the driver and executors at the very beginning of a Spark application's lifecycle).
taskIdMapsForShuffle Registry¶
taskIdMapsForShuffle: ConcurrentHashMap[Int, OpenHashSet[Long]]
SortShuffleManager uses taskIdMapsForShuffle internal registry to track task (attempt) IDs by shuffle.
A new shuffle and task IDs are added when SortShuffleManager is requested for a ShuffleWriter (for a partition and a ShuffleHandle).
A shuffle ID (and associated task IDs) are removed when SortShuffleManager is requested to unregister a shuffle.
Getting ShuffleWriter for Partition and ShuffleHandle¶
getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V]
getWriter registers the given ShuffleHandle (by the shuffleId and numMaps) in the taskIdMapsForShuffle internal registry unless already done.
Note
getWriter expects that the input ShuffleHandle is a BaseShuffleHandle. Moreover, getWriter expects that in two (out of three cases) it is a more specialized IndexShuffleBlockResolver.
getWriter then creates a new ShuffleWriter based on the type of the given ShuffleHandle.
| ShuffleHandle | ShuffleWriter |
|---|---|
| SerializedShuffleHandle | UnsafeShuffleWriter |
| BypassMergeSortShuffleHandle | BypassMergeSortShuffleWriter |
| BaseShuffleHandle | SortShuffleWriter |
getWriter is part of the ShuffleManager abstraction.
ShuffleExecutorComponents¶
shuffleExecutorComponents: ShuffleExecutorComponents
SortShuffleManager defines the shuffleExecutorComponents internal registry for a ShuffleExecutorComponents.
shuffleExecutorComponents is used when:
SortShuffleManageris requested for the ShuffleWriter
loadShuffleExecutorComponents¶
loadShuffleExecutorComponents(
conf: SparkConf): ShuffleExecutorComponents
loadShuffleExecutorComponents loads the ShuffleDataIO that is then requested for the ShuffleExecutorComponents.
loadShuffleExecutorComponents requests the ShuffleExecutorComponents to initialize before returning it.
Creating ShuffleHandle for ShuffleDependency¶
registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
registerShuffle is part of the ShuffleManager abstraction.
registerShuffle creates a new ShuffleHandle (for the given ShuffleDependency) that is one of the following:
-
BypassMergeSortShuffleHandle (with
ShuffleDependency[K, V, V]) when shouldBypassMergeSort condition holds -
SerializedShuffleHandle (with
ShuffleDependency[K, V, V]) when canUseSerializedShuffle condition holds
SerializedShuffleHandle Requirements¶
canUseSerializedShuffle(
dependency: ShuffleDependency[_, _, _]): Boolean
canUseSerializedShuffle is true when all of the following hold for the given ShuffleDependency:
-
Serializer (of the given
ShuffleDependency) supports relocation of serialized objects -
mapSideCombine flag (of the given
ShuffleDependency) isfalse -
Number of partitions (of the Partitioner of the given
ShuffleDependency) is not greater than the supported maximum number
With all of the above positive, canUseSerializedShuffle prints out the following DEBUG message to the logs:
Can use serialized shuffle for shuffle [shuffleId]
Otherwise, canUseSerializedShuffle is false and prints out one of the following DEBUG messages based on the failed requirement:
Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation
Can't use serialized shuffle for shuffle [id] because we need to do map-side aggregation
Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions
canUseSerializedShuffle is used when:
SortShuffleManageris requested to register a shuffle (and creates a ShuffleHandle)
Maximum Number of Partition Identifiers for Serialized Mode¶
SortShuffleManager defines MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE internal constant to be (1 << 24) (16777216) for the maximum number of shuffle output partitions.
MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE is used when:
UnsafeShuffleWriteris createdSortShuffleManagerutility is used to check out SerializedShuffleHandle requirementsShuffleExchangeExec(Spark SQL) utility is used toneedToCopyObjectsBeforeShuffle
Creating ShuffleBlockResolver¶
shuffleBlockResolver: IndexShuffleBlockResolver
shuffleBlockResolver is part of the ShuffleManager abstraction.
shuffleBlockResolver is a IndexShuffleBlockResolver (and is created immediately alongside this SortShuffleManager).
Unregistering Shuffle¶
unregisterShuffle(
shuffleId: Int): Boolean
unregisterShuffle is part of the ShuffleManager abstraction.
unregisterShuffle removes the given shuffleId from the taskIdMapsForShuffle internal registry.
If the shuffleId was found and removed successfully, unregisterShuffle requests the IndexShuffleBlockResolver to remove the shuffle index and data files for every mapTaskId (mappers producing the output for the shuffle).
unregisterShuffle is always true.
Getting ShuffleReader for ShuffleHandle¶
getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
getReader is part of the ShuffleManager abstraction.
getReader requests the MapOutputTracker (via SparkEnv) for the getMapSizesByExecutorId for the shuffleId (of the given ShuffleHandle).
In the end, getReader creates a new BlockStoreShuffleReader.
Stopping ShuffleManager¶
stop(): Unit
stop is part of the ShuffleManager abstraction.
stop requests the IndexShuffleBlockResolver to stop.
Logging¶
Enable ALL logging level for org.apache.spark.shuffle.sort.SortShuffleManager logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.shuffle.sort.SortShuffleManager=ALL
Refer to Logging.