Skip to content

ExternalSorter

ExternalSorter is a Spillable of WritablePartitionedPairCollection of pairs (of K keys and C values).

ExternalSorter[K, V, C] is a parameterized type of K keys, V values, and C combiner (partial) values.

ExternalSorter is used for the following:

Creating Instance

ExternalSorter takes the following to be created:

ExternalSorter is created when:

Inserting Records

insertAll(
  records: Iterator[Product2[K, V]]): Unit

insertAll branches off per whether the optional Aggregator was specified or not (when creating the ExternalSorter).

insertAll takes all records eagerly and materializes the given records iterator.

Map-Side Aggregator Specified

With an Aggregator given, insertAll creates an update function based on the mergeValue and createCombiner functions of the Aggregator.

For every record, insertAll increment internal read counter.

insertAll requests the PartitionedAppendOnlyMap to changeValue for the key (made up of the partition of the key of the current record and the key itself, i.e. (partition, key)) with the update function.

In the end, insertAll spills the in-memory collection to disk if needed with the usingMap flag enabled (to indicate that the PartitionedAppendOnlyMap was updated).

No Map-Side Aggregator Specified

With no Aggregator given, insertAll iterates over all the records and uses the PartitionedPairBuffer instead.

For every record, insertAll increment internal read counter.

insertAll requests the PartitionedPairBuffer to insert with the partition of the key of the current record, the key itself and the value of the current record.

In the end, insertAll spills the in-memory collection to disk if needed with the usingMap flag disabled (since this time the PartitionedPairBuffer was updated, not the PartitionedAppendOnlyMap).

Spilling In-Memory Collection to Disk

maybeSpillCollection(
  usingMap: Boolean): Unit

maybeSpillCollection branches per the input usingMap flag (to indicate which in-memory collection to use, the PartitionedAppendOnlyMap or the PartitionedPairBuffer).

maybeSpillCollection requests the collection to estimate size (in bytes) that is tracked as the peakMemoryUsedBytes metric (for every size bigger than what is currently recorded).

maybeSpillCollection spills the collection to disk if needed. If spilled, maybeSpillCollection creates a new collection (a new PartitionedAppendOnlyMap or a new PartitionedPairBuffer).

Usage

insertAll is used when:

In-Memory Collections of Records

ExternalSorter uses PartitionedPairBuffers or PartitionedAppendOnlyMaps to store records in memory before spilling to disk.

ExternalSorter uses PartitionedPairBuffers when created with no Aggregator. Otherwise, ExternalSorter uses PartitionedAppendOnlyMaps.

ExternalSorter inserts records to the collections when insertAll.

ExternalSorter spills the in-memory collection to disk if needed and, if so, creates a new collection.

ExternalSorter releases the collections (nulls them) when requested to forceSpill and stop. That is when the JVM garbage collector takes care of evicting them from memory completely.

Peak Size of In-Memory Collection

ExternalSorter tracks the peak size (in bytes) of the in-memory collection whenever requested to spill the in-memory collection to disk if needed.

The peak size is used when:

Spills

spills: ArrayBuffer[SpilledFile]

ExternalSorter creates the spills internal buffer of SpilledFiles when created.

A new SpilledFile is added when ExternalSorter is requested to spill.

Note

No elements in spills indicate that there is only in-memory data.

SpilledFiles are deleted physically from disk and the spills buffer is cleared when ExternalSorter is requested to stop.

ExternalSorter uses the spills buffer when requested for an partitionedIterator.

Number of Spills

numSpills: Int

numSpills is the number of spill files this sorter has spilled.

SpilledFile

SpilledFile is a metadata of a spilled file:

  • File (Java)
  • BlockId
  • Serializer Batch Sizes (Array[Long])
  • Elements per Partition (Array[Long])

Spilling Data to Disk

spill(
  collection: WritablePartitionedPairCollection[K, C]): Unit

spill is part of the Spillable abstraction.

spill requests the given WritablePartitionedPairCollection for a destructive WritablePartitionedIterator.

spill spillMemoryIteratorToDisk (with the destructive WritablePartitionedIterator) that creates a SpilledFile.

In the end, spill adds the SpilledFile to the spills internal registry.

spillMemoryIteratorToDisk

spillMemoryIteratorToDisk(
  inMemoryIterator: WritablePartitionedIterator): SpilledFile

spillMemoryIteratorToDisk...FIXME

spillMemoryIteratorToDisk is used when:

  • ExternalSorter is requested to spill
  • SpillableIterator is requested to spill

partitionedIterator

partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])]

partitionedIterator...FIXME

partitionedIterator is used when:

writePartitionedMapOutput

writePartitionedMapOutput(
  shuffleId: Int,
  mapId: Long,
  mapOutputWriter: ShuffleMapOutputWriter): Unit

writePartitionedMapOutput...FIXME

writePartitionedMapOutput is used when:

Iterator

iterator: Iterator[Product2[K, C]]

iterator turns the isShuffleSort flag off (false).

iterator partitionedIterator and takes the combined values (the second elements) only.

iterator is used when:

Stopping ExternalSorter

stop(): Unit

stop...FIXME

stop is used when:

  • BlockStoreShuffleReader is requested to read records (with ordering defined)
  • SortShuffleWriter is requested to stop

Logging

Enable ALL logging level for org.apache.spark.util.collection.ExternalSorter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.util.collection.ExternalSorter=ALL

Refer to Logging.