ExternalSorter

ExternalSorter is a Spillable of WritablePartitionedPairCollection of pairs (of K keys and C values).

ExternalSorter[K, V, C] is a parameterized type of K keys, V values, and C combiner (partial) values.

Creating Instance

ExternalSorter takes the following to be created:

ExternalSorter is created when:

In-Memory Collections of Records

ExternalSorter uses PartitionedPairBuffers or PartitionedAppendOnlyMaps to store records in memory before spilling to disk. ExternalSorter uses PartitionedPairBuffers when created with no Aggregator specified. Otherwise, ExternalSorter uses PartitionedAppendOnlyMaps.

ExternalSorter creates a PartitionedPairBuffer and a PartitionedAppendOnlyMap when created.

ExternalSorter inserts records to the collections when insertAll.

ExternalSorter spills the in-memory collection to disk if needed and, if so, creates a new collection.

ExternalSorter releases the collections (nulls them) when requested to forceSpill and stop. That is when the JVM garbage collector takes care of evicting them from memory completely.

Peak Size of In-Memory Collection

ExternalSorter tracks the peak size (in bytes) of the in-memory collection whenever requested to spill the in-memory collection to disk if needed.

The peak size is used when:

Spills Internal Registry

ExternalSorter manages spilled files.

Inserting Records

insertAll(
  records: Iterator[Product2[K, V]]): Unit

insertAll branches off per whether the optional Aggregator was specified or not (to create the ExternalSorter).

insertAll takes all records eagerly and materializes the given records iterator.

Map-Side Aggregator Specified

If there is an Aggregator specified, insertAll creates an update function based on the mergeValue and createCombiner functions of the Aggregator.

For every record, insertAll increment internal read counter.

insertAll requests the PartitionedAppendOnlyMap to changeValue for the key (made up of the partition of the key of the current record and the key itself, i.e. (partition, key)) with the update function.

In the end, insertAll spills the in-memory collection to disk if needed (with the usingMap flag on since the PartitionedAppendOnlyMap was updated).

No Map-Side Aggregator Specified

With no Aggregator specified, insertAll iterates over all the records and uses the PartitionedPairBuffer instead.

For every record, insertAll increment internal read counter.

insertAll requests the PartitionedPairBuffer to insert with the partition of the key of the current record, the key itself and the value of the current record.

In the end, insertAll spills the in-memory collection to disk if needed (with the usingMap flag off since this time the PartitionedPairBuffer was updated, not the PartitionedAppendOnlyMap).

Usage

insertAll is used when:

Writing All Records Into Partitioned File

writePartitionedFile(
  blockId: BlockId,
  outputFile: File): Array[Long]

writePartitionedFile…​FIXME

writePartitionedFile is used when SortShuffleWriter is requested to write records.

Stopping ExternalSorter

stop(): Unit

stop…​FIXME

stop is used when:

  • BlockStoreShuffleReader is requested to read records (with sort ordering defined)

  • SortShuffleWriter is requested to stop

Spilling Data to Disk

spill(
  collection: WritablePartitionedPairCollection[K, C]): Unit

spill requests the given WritablePartitionedPairCollection for a destructive WritablePartitionedIterator.

spill spillMemoryIteratorToDisk (with the destructive WritablePartitionedIterator) that creates a SpilledFile.

spill adds the SpilledFile to the spills internal registry.

spill is part of the Spillable abstraction.

spillMemoryIteratorToDisk Method

spillMemoryIteratorToDisk(
  inMemoryIterator: WritablePartitionedIterator): SpilledFile

spillMemoryIteratorToDisk…​FIXME

spillMemoryIteratorToDisk is used when:

  • ExternalSorter is requested to spill

  • SpillableIterator is requested to spill

Spilling In-Memory Collection to Disk

maybeSpillCollection(
  usingMap: Boolean): Unit

maybeSpillCollection branches per the input usingMap flag (that is to determine which in-memory collection to use, the PartitionedAppendOnlyMap or the PartitionedPairBuffer).

maybeSpillCollection requests the collection to estimate size (in bytes) that is tracked as the peakMemoryUsedBytes metric (for every size bigger than what is currently recorded).

maybeSpillCollection spills the collection to disk if needed. If spilled, maybeSpillCollection creates a new collection (a new PartitionedAppendOnlyMap or a new PartitionedPairBuffer).

maybeSpillCollection is used when ExternalSorter is requested to insertAll.

iterator Method

iterator: Iterator[Product2[K, C]]

iterator…​FIXME

iterator is used when BlockStoreShuffleReader is requested to read combined records for a reduce task.

partitionedIterator Method

partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])]

partitionedIterator…​FIXME

partitionedIterator is used when ExternalSorter is requested for an iterator and to write a partitioned file

Logging

Enable ALL logging level for org.apache.spark.util.collection.ExternalSorter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.util.collection.ExternalSorter=ALL

Refer to Logging.