ExternalSorter¶
ExternalSorter
is a Spillable of WritablePartitionedPairCollection
of pairs (of K keys and C values).
ExternalSorter[K, V, C]
is a parameterized type of K
keys, V
values, and C
combiner (partial) values.
ExternalSorter
is used for the following:
- SortShuffleWriter to write records
- BlockStoreShuffleReader to read records (with a key ordering defined)
Creating Instance¶
ExternalSorter
takes the following to be created:
- TaskContext
- Optional Aggregator (default: undefined)
- Optional Partitioner (default: undefined)
- Optional
Ordering
(Scala) for keys (default: undefined) - Serializer (default: Serializer)
ExternalSorter
is created when:
BlockStoreShuffleReader
is requested to read records (for a reduce task)SortShuffleWriter
is requested to write records (as aExternalSorter[K, V, C]
orExternalSorter[K, V, V]
based on Map-Size Partial Aggregation Flag)
Inserting Records¶
insertAll(
records: Iterator[Product2[K, V]]): Unit
insertAll
branches off per whether the optional Aggregator was specified or not (when creating the ExternalSorter).
insertAll
takes all records eagerly and materializes the given records iterator.
Map-Side Aggregator Specified¶
With an Aggregator given, insertAll
creates an update function based on the mergeValue and createCombiner functions of the Aggregator
.
For every record, insertAll
increment internal read counter.
insertAll
requests the PartitionedAppendOnlyMap to changeValue
for the key (made up of the partition of the key of the current record and the key itself, i.e. (partition, key)
) with the update function.
In the end, insertAll
spills the in-memory collection to disk if needed with the usingMap
flag enabled (to indicate that the PartitionedAppendOnlyMap was updated).
No Map-Side Aggregator Specified¶
With no Aggregator given, insertAll
iterates over all the records and uses the PartitionedPairBuffer instead.
For every record, insertAll
increment internal read counter.
insertAll
requests the PartitionedPairBuffer to insert with the partition of the key of the current record, the key itself and the value of the current record.
In the end, insertAll
spills the in-memory collection to disk if needed with the usingMap
flag disabled (since this time the PartitionedPairBuffer was updated, not the PartitionedAppendOnlyMap).
Spilling In-Memory Collection to Disk¶
maybeSpillCollection(
usingMap: Boolean): Unit
maybeSpillCollection
branches per the input usingMap
flag (to indicate which in-memory collection to use, the PartitionedAppendOnlyMap or the PartitionedPairBuffer).
maybeSpillCollection
requests the collection to estimate size (in bytes) that is tracked as the peakMemoryUsedBytes metric (for every size bigger than what is currently recorded).
maybeSpillCollection
spills the collection to disk if needed. If spilled, maybeSpillCollection
creates a new collection (a new PartitionedAppendOnlyMap
or a new PartitionedPairBuffer
).
Usage¶
insertAll
is used when:
SortShuffleWriter
is requested to write records (as aExternalSorter[K, V, C]
orExternalSorter[K, V, V]
based on Map-Size Partial Aggregation Flag)BlockStoreShuffleReader
is requested to read records (with a key sorting defined)
In-Memory Collections of Records¶
ExternalSorter
uses PartitionedPairBuffer
s or PartitionedAppendOnlyMap
s to store records in memory before spilling to disk.
ExternalSorter
uses PartitionedPairBuffer
s when created with no Aggregator. Otherwise, ExternalSorter
uses PartitionedAppendOnlyMap
s.
ExternalSorter
inserts records to the collections when insertAll.
ExternalSorter
spills the in-memory collection to disk if needed and, if so, creates a new collection.
ExternalSorter
releases the collections (null
s them) when requested to forceSpill and stop. That is when the JVM garbage collector takes care of evicting them from memory completely.
Peak Size of In-Memory Collection¶
ExternalSorter
tracks the peak size (in bytes) of the in-memory collection whenever requested to spill the in-memory collection to disk if needed.
The peak size is used when:
BlockStoreShuffleReader
is requested to read combined records for a reduce task (with an ordering defined)ExternalSorter
is requested to writePartitionedMapOutput
Spills¶
spills: ArrayBuffer[SpilledFile]
ExternalSorter
creates the spills
internal buffer of SpilledFiles when created.
A new SpilledFile
is added when ExternalSorter
is requested to spill.
Note
No elements in spills
indicate that there is only in-memory data.
SpilledFile
s are deleted physically from disk and the spills
buffer is cleared when ExternalSorter
is requested to stop.
ExternalSorter
uses the spills
buffer when requested for an partitionedIterator.
Number of Spills¶
numSpills: Int
numSpills
is the number of spill files this sorter has spilled.
SpilledFile¶
SpilledFile
is a metadata of a spilled file:
Spilling Data to Disk¶
spill(
collection: WritablePartitionedPairCollection[K, C]): Unit
spill
is part of the Spillable abstraction.
spill
requests the given WritablePartitionedPairCollection
for a destructive WritablePartitionedIterator
.
spill
spillMemoryIteratorToDisk (with the destructive WritablePartitionedIterator
) that creates a SpilledFile.
In the end, spill
adds the SpilledFile
to the spills internal registry.
spillMemoryIteratorToDisk¶
spillMemoryIteratorToDisk(
inMemoryIterator: WritablePartitionedIterator): SpilledFile
spillMemoryIteratorToDisk
...FIXME
spillMemoryIteratorToDisk
is used when:
ExternalSorter
is requested to spillSpillableIterator
is requested tospill
partitionedIterator¶
partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])]
partitionedIterator
...FIXME
partitionedIterator
is used when:
ExternalSorter
is requested for an iterator and to writePartitionedMapOutput
writePartitionedMapOutput¶
writePartitionedMapOutput(
shuffleId: Int,
mapId: Long,
mapOutputWriter: ShuffleMapOutputWriter): Unit
writePartitionedMapOutput
...FIXME
writePartitionedMapOutput
is used when:
SortShuffleWriter
is requested to write records
Iterator¶
iterator: Iterator[Product2[K, C]]
iterator
turns the isShuffleSort flag off (false
).
iterator
partitionedIterator and takes the combined values (the second elements) only.
iterator
is used when:
BlockStoreShuffleReader
is requested to read combined records for a reduce task
Stopping ExternalSorter¶
stop(): Unit
stop
...FIXME
stop
is used when:
BlockStoreShuffleReader
is requested to read records (with ordering defined)SortShuffleWriter
is requested to stop
Logging¶
Enable ALL
logging level for org.apache.spark.util.collection.ExternalSorter
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.util.collection.ExternalSorter=ALL
Refer to Logging.