Skip to content

ExternalAppendOnlyMap

ExternalAppendOnlyMap is a Spillable of SizeTrackers.

ExternalAppendOnlyMap[K, V, C] is a parameterized type of K keys, V values, and C combiner (partial) values.

Creating Instance

ExternalAppendOnlyMap takes the following to be created:

  • [[createCombiner]] createCombiner function (V => C)
  • [[mergeValue]] mergeValue function ((C, V) => C)
  • [[mergeCombiners]] mergeCombiners function ((C, C) => C)
  • [[serializer]] Optional serializer:Serializer.md[Serializer] (default: core:SparkEnv.md#serializer[system Serializer])
  • [[blockManager]] Optional storage:BlockManager.md[BlockManager] (default: core:SparkEnv.md#blockManager[system BlockManager])
  • [[context]] Optional scheduler:spark-TaskContext.md[TaskContext] (default: scheduler:spark-TaskContext.md#get[current TaskContext])
  • [[serializerManager]] Optional serializer:SerializerManager.md[SerializerManager] (default: core:SparkEnv.md#serializerManager[system SerializerManager])

ExternalAppendOnlyMap is created when:

  • Aggregator is requested to rdd:Aggregator.md#combineValuesByKey[combineValuesByKey] and rdd:Aggregator.md#combineCombinersByKey[combineCombinersByKey]

  • CoGroupedRDD is requested to compute a partition

== [[currentMap]] SizeTrackingAppendOnlyMap

ExternalAppendOnlyMap manages a SizeTrackingAppendOnlyMap.

A SizeTrackingAppendOnlyMap is created immediately when ExternalAppendOnlyMap is and every time when <> and <> spilled to disk.

SizeTrackingAppendOnlyMap are dereferenced (nulled) for the memory to be garbage-collected when <> and <>.

SizeTrackingAppendOnlyMap is used when <>, <>, <> and <>.

== [[insertAll]] Inserting All Key-Value Pairs (from Iterator)

[source, scala]

insertAll( entries: Iterator[Product2[K, V]]): Unit


[[insertAll-update-function]] insertAll creates an update function that uses the <> function for an existing value or the <> function for a new value.

For every key-value pair (from the input iterator), insertAll does the following:

  • Requests the <> for the estimated size and, if greater than the <<_peakMemoryUsedBytes, _peakMemoryUsedBytes>> metric, updates it.

  • shuffle:Spillable.md#maybeSpill[Spills to a disk if necessary] and, if spilled, creates a new <>

  • Requests the <> to change value for the current value (with the <> function)

  • shuffle:Spillable.md#addElementsRead[Increments the elements read counter]

=== [[insertAll-usage]] Usage

insertAll is used when:

  • Aggregator is requested to rdd:Aggregator.md#combineValuesByKey[combineValuesByKey] and rdd:Aggregator.md#combineCombinersByKey[combineCombinersByKey]

  • CoGroupedRDD is requested to compute a partition

  • ExternalAppendOnlyMap is requested to <>

=== [[insertAll-requirements]] Requirements

insertAll throws an IllegalStateException when the <> internal registry is null:

[source,plaintext]

Cannot insert new elements into a map after calling iterator

== [[iterator]] Iterator of "Combined" Pairs

[source, scala]

iterator: Iterator[(K, C)]

iterator...FIXME

iterator is used when...FIXME

== [[spill]] Spilling to Disk if Necessary

[source, scala]

spill( collection: SizeTracker): Unit


spill...FIXME

spill is used when...FIXME

== [[forceSpill]] Forcing Disk Spilling

[source, scala]

forceSpill(): Boolean

forceSpill returns a flag to indicate whether spilling to disk has really happened (true) or not (false).

forceSpill branches off per the current state it is in (and should rather use a state-aware implementation).

When a <> is in use, forceSpill requests it to spill and, if it did, dereferences (nullify) the <>. forceSpill returns whatever the spilling of the <> returned.

When there is at least one element in the <>, forceSpill <> it. forceSpill then creates a new <> and always returns true.

In other cases, forceSpill simply returns false.

forceSpill is part of the shuffle:Spillable.md[Spillable] abstraction.

== [[freeCurrentMap]] Freeing Up SizeTrackingAppendOnlyMap and Releasing Memory

[source, scala]

freeCurrentMap(): Unit

freeCurrentMap dereferences (nullify) the <> (if there still was one) followed by shuffle:Spillable.md#releaseMemory[releasing all memory].

freeCurrentMap is used when SpillableIterator is requested to destroy itself.

== [[spillMemoryIteratorToDisk]] spillMemoryIteratorToDisk Method

[source, scala]

spillMemoryIteratorToDisk( inMemoryIterator: Iterator[(K, C)]): DiskMapIterator


spillMemoryIteratorToDisk...FIXME

spillMemoryIteratorToDisk is used when...FIXME


Last update: 2020-10-09