ExternalSorter¶
ExternalSorter is a Spillable of WritablePartitionedPairCollection of pairs (of K keys and C values).
ExternalSorter[K, V, C] is a parameterized type of K keys, V values, and C combiner (partial) values.
ExternalSorter is used for the following:
- SortShuffleWriter to write records
- BlockStoreShuffleReader to read records (with a key ordering defined)
Creating Instance¶
ExternalSorter takes the following to be created:
- TaskContext
- Optional Aggregator (default: undefined)
- Optional Partitioner (default: undefined)
- Optional
Ordering(Scala) for keys (default: undefined) - Serializer (default: Serializer)
ExternalSorter is created when:
BlockStoreShuffleReaderis requested to read records (for a reduce task)SortShuffleWriteris requested to write records (as aExternalSorter[K, V, C]orExternalSorter[K, V, V]based on Map-Size Partial Aggregation Flag)
Inserting Records¶
insertAll(
records: Iterator[Product2[K, V]]): Unit
insertAll branches off per whether the optional Aggregator was specified or not (when creating the ExternalSorter).
insertAll takes all records eagerly and materializes the given records iterator.
Map-Side Aggregator Specified¶
With an Aggregator given, insertAll creates an update function based on the mergeValue and createCombiner functions of the Aggregator.
For every record, insertAll increment internal read counter.
insertAll requests the PartitionedAppendOnlyMap to changeValue for the key (made up of the partition of the key of the current record and the key itself, i.e. (partition, key)) with the update function.
In the end, insertAll spills the in-memory collection to disk if needed with the usingMap flag enabled (to indicate that the PartitionedAppendOnlyMap was updated).
No Map-Side Aggregator Specified¶
With no Aggregator given, insertAll iterates over all the records and uses the PartitionedPairBuffer instead.
For every record, insertAll increment internal read counter.
insertAll requests the PartitionedPairBuffer to insert with the partition of the key of the current record, the key itself and the value of the current record.
In the end, insertAll spills the in-memory collection to disk if needed with the usingMap flag disabled (since this time the PartitionedPairBuffer was updated, not the PartitionedAppendOnlyMap).
Spilling In-Memory Collection to Disk¶
maybeSpillCollection(
usingMap: Boolean): Unit
maybeSpillCollection branches per the input usingMap flag (to indicate which in-memory collection to use, the PartitionedAppendOnlyMap or the PartitionedPairBuffer).
maybeSpillCollection requests the collection to estimate size (in bytes) that is tracked as the peakMemoryUsedBytes metric (for every size bigger than what is currently recorded).
maybeSpillCollection spills the collection to disk if needed. If spilled, maybeSpillCollection creates a new collection (a new PartitionedAppendOnlyMap or a new PartitionedPairBuffer).
Usage¶
insertAll is used when:
SortShuffleWriteris requested to write records (as aExternalSorter[K, V, C]orExternalSorter[K, V, V]based on Map-Size Partial Aggregation Flag)BlockStoreShuffleReaderis requested to read records (with a key sorting defined)
In-Memory Collections of Records¶
ExternalSorter uses PartitionedPairBuffers or PartitionedAppendOnlyMaps to store records in memory before spilling to disk.
ExternalSorter uses PartitionedPairBuffers when created with no Aggregator. Otherwise, ExternalSorter uses PartitionedAppendOnlyMaps.
ExternalSorter inserts records to the collections when insertAll.
ExternalSorter spills the in-memory collection to disk if needed and, if so, creates a new collection.
ExternalSorter releases the collections (nulls them) when requested to forceSpill and stop. That is when the JVM garbage collector takes care of evicting them from memory completely.
Peak Size of In-Memory Collection¶
ExternalSorter tracks the peak size (in bytes) of the in-memory collection whenever requested to spill the in-memory collection to disk if needed.
The peak size is used when:
BlockStoreShuffleReaderis requested to read combined records for a reduce task (with an ordering defined)ExternalSorteris requested to writePartitionedMapOutput
Spills¶
spills: ArrayBuffer[SpilledFile]
ExternalSorter creates the spills internal buffer of SpilledFiles when created.
A new SpilledFile is added when ExternalSorter is requested to spill.
Note
No elements in spills indicate that there is only in-memory data.
SpilledFiles are deleted physically from disk and the spills buffer is cleared when ExternalSorter is requested to stop.
ExternalSorter uses the spills buffer when requested for an partitionedIterator.
Number of Spills¶
numSpills: Int
numSpills is the number of spill files this sorter has spilled.
SpilledFile¶
SpilledFile is a metadata of a spilled file:
Spilling Data to Disk¶
spill(
collection: WritablePartitionedPairCollection[K, C]): Unit
spill is part of the Spillable abstraction.
spill requests the given WritablePartitionedPairCollection for a destructive WritablePartitionedIterator.
spill spillMemoryIteratorToDisk (with the destructive WritablePartitionedIterator) that creates a SpilledFile.
In the end, spill adds the SpilledFile to the spills internal registry.
spillMemoryIteratorToDisk¶
spillMemoryIteratorToDisk(
inMemoryIterator: WritablePartitionedIterator): SpilledFile
spillMemoryIteratorToDisk...FIXME
spillMemoryIteratorToDisk is used when:
ExternalSorteris requested to spillSpillableIteratoris requested tospill
partitionedIterator¶
partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])]
partitionedIterator...FIXME
partitionedIterator is used when:
ExternalSorteris requested for an iterator and to writePartitionedMapOutput
writePartitionedMapOutput¶
writePartitionedMapOutput(
shuffleId: Int,
mapId: Long,
mapOutputWriter: ShuffleMapOutputWriter): Unit
writePartitionedMapOutput...FIXME
writePartitionedMapOutput is used when:
SortShuffleWriteris requested to write records
Iterator¶
iterator: Iterator[Product2[K, C]]
iterator turns the isShuffleSort flag off (false).
iterator partitionedIterator and takes the combined values (the second elements) only.
iterator is used when:
BlockStoreShuffleReaderis requested to read combined records for a reduce task
Stopping ExternalSorter¶
stop(): Unit
stop...FIXME
stop is used when:
BlockStoreShuffleReaderis requested to read records (with ordering defined)SortShuffleWriteris requested to stop
Logging¶
Enable ALL logging level for org.apache.spark.util.collection.ExternalSorter logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.util.collection.ExternalSorter=ALL
Refer to Logging.