Skip to content

ExternalSorter

= [[ExternalSorter]] ExternalSorter

ExternalSorter is a shuffle:Spillable.md[Spillable] of WritablePartitionedPairCollection of pairs (of K keys and C values).

ExternalSorter[K, V, C] is a parameterized type of K keys, V values, and C combiner (partial) values.

== [[creating-instance]] Creating Instance

ExternalSorter takes the following to be created:

  • [[context]] scheduler:spark-TaskContext.md[TaskContext]
  • [[aggregator]] Optional rdd:Aggregator.md[Aggregator] (default: undefined)
  • [[partitioner]] Optional rdd:Partitioner[Partitioner] (default: undefined)
  • [[ordering]] Optional Scala's http://www.scala-lang.org/api/current/scala/math/Ordering.html[Ordering] for keys (default: undefined)
  • [[serializer]] Optional serializer:Serializer.md[Serializer] (default: core:SparkEnv.md#serializer[system Serializer])

ExternalSorter is created when:

  • SortShuffleWriter is requested to shuffle:SortShuffleWriter.md#write[write records] (as a ExternalSorter[K, V, C] or ExternalSorter[K, V, V] based on Map-Size Partial Aggregation Flag)

  • BlockStoreShuffleReader is requested to shuffle:BlockStoreShuffleReader.md#read[read records] (with sort ordering defined)

== [[in-memory-collection]][[buffer]][[map]] In-Memory Collections of Records

ExternalSorter uses PartitionedPairBuffers or PartitionedAppendOnlyMaps to store records in memory before spilling to disk. ExternalSorter uses PartitionedPairBuffers when created with no <> specified. Otherwise, ExternalSorter uses PartitionedAppendOnlyMaps.

ExternalSorter creates a PartitionedPairBuffer and a PartitionedAppendOnlyMap when created.

ExternalSorter inserts records to the collections when <>.

ExternalSorter <> and, shuffle:Spillable.md#maybeSpill[if so], creates a new collection.

ExternalSorter releases the collections (nulls them) when requested to <> and <>. That is when the JVM garbage collector takes care of evicting them from memory completely.

== [[peakMemoryUsedBytes]][[_peakMemoryUsedBytes]] Peak Size of In-Memory Collection

ExternalSorter tracks the peak size (in bytes) of the <> whenever requested to <>.

The peak size is used when:

  • BlockStoreShuffleReader is requested to shuffle:BlockStoreShuffleReader.md#read[read combined records for a reduce task] (with a sort ordering defined)

  • ExternalSorter is requested to <>

== [[spills]] Spills Internal Registry

ExternalSorter manages spilled files.

== [[insertAll]] Inserting Records

[source, scala]

insertAll( records: Iterator[Product2[K, V]]): Unit


insertAll branches off per whether the optional <> was <> or <> (to create the <>).

insertAll takes all records eagerly and materializes the given records iterator.

=== [[insertAll-shouldCombine]] Map-Side Aggregator Specified

If there is an Aggregator specified, insertAll creates an update function based on the rdd:Aggregator.md#mergeValue[mergeValue] and rdd:Aggregator.md#createCombiner[createCombiner] functions of the Aggregator.

For every record, insertAll shuffle:Spillable.md#addElementsRead[increment internal read counter].

insertAll requests the <> to changeValue for the key (made up of the <> of the key of the current record and the key itself, i.e. (partition, key)) with the update function.

In the end, insertAll <> (with the usingMap flag on since the <> was updated).

=== [[insertAll-no-aggregator]] No Map-Side Aggregator Specified

With no Aggregator specified, insertAll iterates over all the records and uses the <> instead.

For every record, insertAll shuffle:Spillable.md#addElementsRead[increment internal read counter].

insertAll requests the <> to insert with the <> of the key of the current record, the key itself and the value of the current record.

In the end, insertAll <> (with the usingMap flag off since this time the <> was updated, not the <>).

=== [[insertAll-usage]] Usage

insertAll is used when:

  • SortShuffleWriter is requested to shuffle:SortShuffleWriter.md#write[write records] (as a ExternalSorter[K, V, C] or ExternalSorter[K, V, V] based on Map-Size Partial Aggregation Flag)

  • BlockStoreShuffleReader is requested to shuffle:BlockStoreShuffleReader.md#read[read records] (with sort ordering defined)

== [[writePartitionedFile]] Writing All Records Into Partitioned File

[source, scala]

writePartitionedFile( blockId: BlockId, outputFile: File): Array[Long]


writePartitionedFile...FIXME

writePartitionedFile is used when SortShuffleWriter is requested to shuffle:SortShuffleWriter.md#write[write records].

== [[stop]] Stopping ExternalSorter

[source, scala]

stop(): Unit

stop...FIXME

stop is used when:

  • BlockStoreShuffleReader is requested to shuffle:BlockStoreShuffleReader.md#read[read records] (with sort ordering defined)

  • SortShuffleWriter is requested to shuffle:SortShuffleWriter.md#stop[stop]

== [[spill]] Spilling Data to Disk

[source, scala]

spill( collection: WritablePartitionedPairCollection[K, C]): Unit


spill requests the given WritablePartitionedPairCollection for a destructive WritablePartitionedIterator.

spill <> (with the destructive WritablePartitionedIterator) that creates a SpilledFile.

spill adds the SpilledFile to the <> internal registry.

spill is part of the Spillable.md#spill[Spillable] abstraction.

== [[spillMemoryIteratorToDisk]] spillMemoryIteratorToDisk Method

[source, scala]

spillMemoryIteratorToDisk( inMemoryIterator: WritablePartitionedIterator): SpilledFile


spillMemoryIteratorToDisk...FIXME

spillMemoryIteratorToDisk is used when:

  • ExternalSorter is requested to <>

  • SpillableIterator is requested to spill

== [[maybeSpillCollection]] Spilling In-Memory Collection to Disk

[source, scala]

maybeSpillCollection( usingMap: Boolean): Unit


maybeSpillCollection branches per the input usingMap flag (that is to determine which in-memory collection to use, the <> or the <>).

maybeSpillCollection requests the collection to estimate size (in bytes) that is tracked as the <> metric (for every size bigger than what is currently recorded).

maybeSpillCollection shuffle:Spillable.md#maybeSpill[spills the collection to disk if needed]. If spilled, maybeSpillCollection creates a new collection (a new PartitionedAppendOnlyMap or a new PartitionedPairBuffer).

maybeSpillCollection is used when ExternalSorter is requested to <>.

== [[iterator]] iterator Method

[source, scala]

iterator: Iterator[Product2[K, C]]

iterator...FIXME

iterator is used when BlockStoreShuffleReader is requested to shuffle:BlockStoreShuffleReader.md#read[read combined records for a reduce task].

== [[partitionedIterator]] partitionedIterator Method

[source, scala]

partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])]

partitionedIterator...FIXME

partitionedIterator is used when ExternalSorter is requested for an <> and to <>

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.util.collection.ExternalSorter logger to see what happens inside.

Add the following line to conf/log4j.properties:

[source]

log4j.logger.org.apache.spark.util.collection.ExternalSorter=ALL

Refer to ROOT:spark-logging.md[Logging].


Last update: 2020-10-09