SortShuffleManager¶
SortShuffleManager
is the default and only ShuffleManager in Apache Spark (with the short name sort
or tungsten-sort
).
Creating Instance¶
SortShuffleManager
takes the following to be created:
SortShuffleManager
is created when SparkEnv
is created (on the driver and executors at the very beginning of a Spark application's lifecycle).
Getting ShuffleWriter For Partition and ShuffleHandle¶
getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V]
getWriter
registers the given ShuffleHandle (by the shuffleId and numMaps) in the numMapsForShuffle internal registry unless already done.
Note
getWriter
expects that the input ShuffleHandle
is a BaseShuffleHandle. Moreover, getWriter
expects that in two (out of three cases) it is a more specialized IndexShuffleBlockResolver.
getWriter
then creates a new ShuffleWriter
based on the type of the given ShuffleHandle
.
ShuffleHandle | ShuffleWriter |
---|---|
SerializedShuffleHandle | UnsafeShuffleWriter |
BypassMergeSortShuffleHandle | BypassMergeSortShuffleWriter |
BaseShuffleHandle | SortShuffleWriter |
getWriter
is part of the ShuffleManager abstraction.
== [[MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE]] Maximum Number of Partition Identifiers
SortShuffleManager allows for (1 << 24)
partition identifiers that can be encoded (i.e. 16777216
).
== [[numMapsForShuffle]] numMapsForShuffle
Lookup table with the number of mappers producing the output for a shuffle (as {java-javadoc-url}/java/util/concurrent/ConcurrentHashMap.html[java.util.concurrent.ConcurrentHashMap])
== [[shuffleBlockResolver]] IndexShuffleBlockResolver
[source, scala]¶
shuffleBlockResolver: ShuffleBlockResolver¶
shuffleBlockResolver is an shuffle:IndexShuffleBlockResolver.md[IndexShuffleBlockResolver] that is created immediately when SortShuffleManager is.
shuffleBlockResolver is used when SortShuffleManager is requested for a <
shuffleBlockResolver is part of the shuffle:ShuffleManager.md#shuffleBlockResolver[ShuffleManager] abstraction.
== [[unregisterShuffle]] Unregistering Shuffle
[source, scala]¶
unregisterShuffle( shuffleId: Int): Boolean
unregisterShuffle tries to remove the given shuffleId
from the <
If the given shuffleId
was registered, unregisterShuffle requests the <
unregisterShuffle is part of the shuffle:ShuffleManager.md#unregisterShuffle[ShuffleManager] abstraction.
== [[registerShuffle]] Creating ShuffleHandle (For ShuffleDependency)
[source, scala]¶
registerShuffleK, V, C: ShuffleHandle
CAUTION: FIXME Copy the conditions
registerShuffle
returns a new ShuffleHandle
that can be one of the following:
-
shuffle:BypassMergeSortShuffleHandle.md[BypassMergeSortShuffleHandle] (with
ShuffleDependency[K, V, V]
) when shuffle:SortShuffleWriter.md#shouldBypassMergeSort[shouldBypassMergeSort] condition holds. -
shuffle:SerializedShuffleHandle.md[SerializedShuffleHandle] (with
ShuffleDependency[K, V, V]
) when <>. -
shuffle:spark-shuffle-BaseShuffleHandle.md[BaseShuffleHandle]
registerShuffle is part of the shuffle:ShuffleManager.md#registerShuffle[ShuffleManager] abstraction.
== [[getReader]] Creating BlockStoreShuffleReader For ShuffleHandle And Reduce Partitions
[source, scala]¶
getReaderK, C: ShuffleReader[K, C]
getReader returns a new shuffle:BlockStoreShuffleReader.md[BlockStoreShuffleReader] passing all the input parameters on to it.
getReader assumes that the input ShuffleHandle
is of type shuffle:spark-shuffle-BaseShuffleHandle.md[BaseShuffleHandle].
getReader is part of the shuffle:ShuffleManager.md#getReader[ShuffleManager] abstraction.
== [[stop]] Stopping SortShuffleManager
[source, scala]¶
stop(): Unit¶
stop simply requests the <
stop is part of the shuffle:ShuffleManager.md#stop[ShuffleManager] abstraction.
== [[canUseSerializedShuffle]] Requirements of SerializedShuffleHandle (as ShuffleHandle)
[source, scala]¶
canUseSerializedShuffle( dependency: ShuffleDependency[_, _, _]): Boolean
canUseSerializedShuffle returns true
when all of the following hold:
. Serializer (of the given ShuffleDependency) serializer:Serializer.md#supportsRelocationOfSerializedObjects[supports relocation of serialized objects]
. No map-side aggregation (the mapSideCombine flag of the given ShuffleDependency is off)
. Number of partitions (of the Partitioner of the given ShuffleDependency) is not greater than the <(1 << 24) - 1
, i.e. 16777215
)
canUseSerializedShuffle prints out the following DEBUG message to the logs:
[source,plaintext]¶
Can use serialized shuffle for shuffle [shuffleId]¶
Otherwise, canUseSerializedShuffle does not hold and prints out one of the following DEBUG messages:
[source,plaintext]¶
Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation
Can't use serialized shuffle for shuffle [id] because an aggregator is defined
Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions¶
shouldBypassMergeSort is used when SortShuffleManager is requested to shuffle:SortShuffleManager.md#registerShuffle[register a shuffle (and creates a ShuffleHandle)].
== [[logging]] Logging
Enable ALL
logging level for org.apache.spark.shuffle.sort.SortShuffleManager
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
[source,plaintext]¶
log4j.logger.org.apache.spark.shuffle.sort.SortShuffleManager=ALL¶
Refer to spark-logging.md[Logging].