SortShuffleManager is the default and only ShuffleManager in Apache Spark (with the short name
SortShuffleManager takes the following to be created:
SortShuffleManager is created when
SparkEnv is created (on the driver and executors at the very beginning of a Spark application's lifecycle).
taskIdMapsForShuffle: ConcurrentHashMap[Int, OpenHashSet[Long]]
taskIdMapsForShuffle internal registry to track task (attempt) IDs by shuffle.
A new shuffle and task IDs are added when
SortShuffleManager is requested for a ShuffleWriter (for a partition and a
A shuffle ID (and associated task IDs) are removed when
SortShuffleManager is requested to unregister a shuffle.
Getting ShuffleWriter for Partition and ShuffleHandle¶
getWriter[K, V]( handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]
getWriter registers the given ShuffleHandle (by the shuffleId and numMaps) in the taskIdMapsForShuffle internal registry unless already done.
getWriter expects that the input
ShuffleHandle is a BaseShuffleHandle. Moreover,
getWriter expects that in two (out of three cases) it is a more specialized IndexShuffleBlockResolver.
getWriter then creates a new
ShuffleWriter based on the type of the given
getWriter is part of the ShuffleManager abstraction.
SortShuffleManager defines the
shuffleExecutorComponents internal registry for a ShuffleExecutorComponents.
shuffleExecutorComponents is used when:
SortShuffleManageris requested for the ShuffleWriter
loadShuffleExecutorComponents( conf: SparkConf): ShuffleExecutorComponents
loadShuffleExecutorComponents loads the ShuffleDataIO that is then requested for the ShuffleExecutorComponents.
loadShuffleExecutorComponents requests the
ShuffleExecutorComponents to initialize before returning it.
Creating ShuffleHandle for ShuffleDependency¶
registerShuffle[K, V, C]( shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle
registerShuffle is part of the ShuffleManager abstraction.
registerShuffle creates a new ShuffleHandle (for the given ShuffleDependency) that is one of the following:
ShuffleDependency[K, V, V]) when shouldBypassMergeSort condition holds
ShuffleDependency[K, V, V]) when canUseSerializedShuffle condition holds
canUseSerializedShuffle( dependency: ShuffleDependency[_, _, _]): Boolean
true when all of the following hold for the given ShuffleDependency:
Serializer (of the given
ShuffleDependency) supports relocation of serialized objects
mapSideCombine flag (of the given
Number of partitions (of the Partitioner of the given
ShuffleDependency) is not greater than the supported maximum number
With all of the above positive,
canUseSerializedShuffle prints out the following DEBUG message to the logs:
Can use serialized shuffle for shuffle [shuffleId]
false and prints out one of the following DEBUG messages based on the failed requirement:
Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation
Can't use serialized shuffle for shuffle [id] because we need to do map-side aggregation
Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions
canUseSerializedShuffle is used when:
SortShuffleManageris requested to register a shuffle (and creates a ShuffleHandle)
Maximum Number of Partition Identifiers for Serialized Mode¶
MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE internal constant to be
(1 << 24) (
16777216) for the maximum number of shuffle output partitions.
MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE is used when:
SortShuffleManagerutility is used to check out SerializedShuffleHandle requirements
ShuffleExchangeExec(Spark SQL) utility is used to
shuffleBlockResolver is part of the ShuffleManager abstraction.
shuffleBlockResolver is a IndexShuffleBlockResolver (and is created immediately alongside this SortShuffleManager).
unregisterShuffle( shuffleId: Int): Boolean
unregisterShuffle is part of the ShuffleManager abstraction.
unregisterShuffle removes the given
shuffleId from the taskIdMapsForShuffle internal registry.
shuffleId was found and removed successfully,
unregisterShuffle requests the IndexShuffleBlockResolver to remove the shuffle index and data files for every
mapTaskId (mappers producing the output for the shuffle).
unregisterShuffle is always
Getting ShuffleReader for ShuffleHandle¶
getReader[K, C]( handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
getReader is part of the ShuffleManager abstraction.
getReader requests the MapOutputTracker (via SparkEnv) for the getMapSizesByExecutorId for the
shuffleId (of the given ShuffleHandle).
In the end,
getReader creates a new BlockStoreShuffleReader.
stop is part of the ShuffleManager abstraction.
stop requests the IndexShuffleBlockResolver to stop.
ALL logging level for
org.apache.spark.shuffle.sort.SortShuffleManager logger to see what happens inside.
Add the following line to
Refer to Logging.