SortShuffleWriter¶
SortShuffleWriter
is a concrete ShuffleWriter that is used when SortShuffleManager.md#getWriter[SortShuffleManager
returns a ShuffleWriter
for ShuffleHandle
] (and the more specialized BypassMergeSortShuffleWriter.md[BypassMergeSortShuffleWriter] and UnsafeShuffleWriter.md[UnsafeShuffleWriter] could not be used).
SortShuffleWriter is created when SortShuffleManager.md#getWriter[SortShuffleManager
returns a ShuffleWriter
for the fallback BaseShuffleHandle
].
SortShuffleWriter[K, V, C]
is a parameterized type with K
keys, V
values, and C
combiner values.
== [[creating-instance]] Creating Instance
SortShuffleWriter takes the following to be created:
- [[shuffleBlockResolver]] IndexShuffleBlockResolver.md[IndexShuffleBlockResolver]
- [[handle]] spark-shuffle-BaseShuffleHandle.md[BaseShuffleHandle]
- [[mapId]] Map ID
- [[context]] TaskContext
== [[mapStatus]] MapStatus
SortShuffleWriter uses an internal variable for a scheduler:MapStatus.md[MapStatus] after <
<
== [[write]] Writing Records (Into Shuffle Partitioned File In Disk Store)
[source, scala]¶
write( records: Iterator[Product2[K, V]]): Unit
write creates an ExternalSorter.md[ExternalSorter] based on the spark-shuffle-BaseShuffleHandle.md#dependency[ShuffleDependency] (of the <
write requests the ExternalSorter to inserts all the records.
write requests the <
write creates a storage:BlockId.md#ShuffleBlockId[ShuffleBlockId] (for the shuffle and the <
write requests ExternalSorter to ExternalSorter.md#writePartitionedFile[write all the records (previously inserted in) into the temporary partitioned file in the disk store]. ExternalSorter returns the length of every partition.
write requests <
write creates a scheduler:MapStatus.md[MapStatus] (with the storage:BlockManager.md#shuffleServerId[location of the shuffle server] that serves the shuffle files and the sizes of the shuffle partitions). The MapStatus
is later available as the <
write does not handle exceptions so when they occur, they will break the processing.
In the end, write deletes the temporary shuffle output file. write prints out the following ERROR message to the logs if the file count not be deleted:
Error while deleting temp file [path]
write is part of the ShuffleWriter.md#write[ShuffleWriter] abstraction.
== [[stop]] Closing SortShuffleWriter (and Calculating MapStatus)
[source, scala]¶
stop( success: Boolean): Option[MapStatus]
stop turns <success
is enabled.
Otherwise, when <success
is disabled, stop returns no MapStatus
(i.e. None
).
In the end, stop requests the ExternalSorter to ExternalSorter.md#stop[stop] and increments the shuffle write time task metrics.
stop is part of the ShuffleWriter.md#contract[ShuffleWriter] abstraction.
== [[shouldBypassMergeSort]] Requirements of BypassMergeSortShuffleHandle (as ShuffleHandle)
[source, scala]¶
shouldBypassMergeSort( conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean
shouldBypassMergeSort returns true
when all of the following hold:
. No map-side aggregation (the mapSideCombine flag of the given ShuffleDependency is off)
. Number of partitions (of the Partitioner of the given ShuffleDependency) is not greater than spark.shuffle.sort.bypassMergeThreshold configuration property
Otherwise, shouldBypassMergeSort
does not hold (i.e. false
).
shouldBypassMergeSort
is used when SortShuffleManager
is requested to register a shuffle (and creates a ShuffleHandle).
== [[logging]] Logging
Enable ALL
logging level for org.apache.spark.shuffle.sort.SortShuffleWriter
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
[source]¶
log4j.logger.org.apache.spark.shuffle.sort.SortShuffleWriter=ALL¶
Refer to spark-logging.md[Logging].
== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| [[stopping]] stopping
| Internal flag to mark that <
|===