SortShuffleManager¶
SortShuffleManager
is the default and only ShuffleManager in Apache Spark (with the short name sort
or tungsten-sort
).
Creating Instance¶
SortShuffleManager
takes the following to be created:
SortShuffleManager
is created when SparkEnv
is created (on the driver and executors at the very beginning of a Spark application's lifecycle).
taskIdMapsForShuffle Registry¶
taskIdMapsForShuffle: ConcurrentHashMap[Int, OpenHashSet[Long]]
SortShuffleManager
uses taskIdMapsForShuffle
internal registry to track task (attempt) IDs by shuffle.
A new shuffle and task IDs are added when SortShuffleManager
is requested for a ShuffleWriter (for a partition and a ShuffleHandle
).
A shuffle ID (and associated task IDs) are removed when SortShuffleManager
is requested to unregister a shuffle.
Getting ShuffleWriter for Partition and ShuffleHandle¶
getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V]
getWriter
registers the given ShuffleHandle (by the shuffleId and numMaps) in the taskIdMapsForShuffle internal registry unless already done.
Note
getWriter
expects that the input ShuffleHandle
is a BaseShuffleHandle. Moreover, getWriter
expects that in two (out of three cases) it is a more specialized IndexShuffleBlockResolver.
getWriter
then creates a new ShuffleWriter
based on the type of the given ShuffleHandle
.
ShuffleHandle | ShuffleWriter |
---|---|
SerializedShuffleHandle | UnsafeShuffleWriter |
BypassMergeSortShuffleHandle | BypassMergeSortShuffleWriter |
BaseShuffleHandle | SortShuffleWriter |
getWriter
is part of the ShuffleManager abstraction.
ShuffleExecutorComponents¶
shuffleExecutorComponents: ShuffleExecutorComponents
SortShuffleManager
defines the shuffleExecutorComponents
internal registry for a ShuffleExecutorComponents.
shuffleExecutorComponents
is used when:
SortShuffleManager
is requested for the ShuffleWriter
loadShuffleExecutorComponents¶
loadShuffleExecutorComponents(
conf: SparkConf): ShuffleExecutorComponents
loadShuffleExecutorComponents
loads the ShuffleDataIO that is then requested for the ShuffleExecutorComponents.
loadShuffleExecutorComponents
requests the ShuffleExecutorComponents
to initialize before returning it.
Creating ShuffleHandle for ShuffleDependency¶
registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
registerShuffle
is part of the ShuffleManager abstraction.
registerShuffle
creates a new ShuffleHandle (for the given ShuffleDependency) that is one of the following:
-
BypassMergeSortShuffleHandle (with
ShuffleDependency[K, V, V]
) when shouldBypassMergeSort condition holds -
SerializedShuffleHandle (with
ShuffleDependency[K, V, V]
) when canUseSerializedShuffle condition holds
SerializedShuffleHandle Requirements¶
canUseSerializedShuffle(
dependency: ShuffleDependency[_, _, _]): Boolean
canUseSerializedShuffle
is true
when all of the following hold for the given ShuffleDependency:
-
Serializer (of the given
ShuffleDependency
) supports relocation of serialized objects -
mapSideCombine flag (of the given
ShuffleDependency
) isfalse
-
Number of partitions (of the Partitioner of the given
ShuffleDependency
) is not greater than the supported maximum number
With all of the above positive, canUseSerializedShuffle
prints out the following DEBUG message to the logs:
Can use serialized shuffle for shuffle [shuffleId]
Otherwise, canUseSerializedShuffle
is false
and prints out one of the following DEBUG messages based on the failed requirement:
Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation
Can't use serialized shuffle for shuffle [id] because we need to do map-side aggregation
Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions
canUseSerializedShuffle
is used when:
SortShuffleManager
is requested to register a shuffle (and creates a ShuffleHandle)
Maximum Number of Partition Identifiers for Serialized Mode¶
SortShuffleManager
defines MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE
internal constant to be (1 << 24)
(16777216
) for the maximum number of shuffle output partitions.
MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE
is used when:
UnsafeShuffleWriter
is createdSortShuffleManager
utility is used to check out SerializedShuffleHandle requirementsShuffleExchangeExec
(Spark SQL) utility is used toneedToCopyObjectsBeforeShuffle
Creating ShuffleBlockResolver¶
shuffleBlockResolver: IndexShuffleBlockResolver
shuffleBlockResolver
is part of the ShuffleManager abstraction.
shuffleBlockResolver
is a IndexShuffleBlockResolver (and is created immediately alongside this SortShuffleManager).
Unregistering Shuffle¶
unregisterShuffle(
shuffleId: Int): Boolean
unregisterShuffle
is part of the ShuffleManager abstraction.
unregisterShuffle
removes the given shuffleId
from the taskIdMapsForShuffle internal registry.
If the shuffleId
was found and removed successfully, unregisterShuffle
requests the IndexShuffleBlockResolver to remove the shuffle index and data files for every mapTaskId
(mappers producing the output for the shuffle).
unregisterShuffle
is always true
.
Getting ShuffleReader for ShuffleHandle¶
getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
getReader
is part of the ShuffleManager abstraction.
getReader
requests the MapOutputTracker (via SparkEnv) for the getMapSizesByExecutorId for the shuffleId
(of the given ShuffleHandle).
In the end, getReader
creates a new BlockStoreShuffleReader.
Stopping ShuffleManager¶
stop(): Unit
stop
is part of the ShuffleManager abstraction.
stop
requests the IndexShuffleBlockResolver to stop.
Logging¶
Enable ALL
logging level for org.apache.spark.shuffle.sort.SortShuffleManager
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.shuffle.sort.SortShuffleManager=ALL
Refer to Logging.