Skip to content

ShuffleExternalSorter

ShuffleExternalSorter is a specialized cache-efficient sorter that sorts arrays of compressed record pointers and partition ids.

ShuffleExternalSorter uses only 8 bytes of space per record in the sorting array to fit more of the array into cache.

ShuffleExternalSorter is created and used by UnsafeShuffleWriter only.

ShuffleExternalSorter and UnsafeShuffleWriter

MemoryConsumer

ShuffleExternalSorter is a MemoryConsumer with page size of 128 MB (unless TaskMemoryManager uses smaller).

ShuffleExternalSorter can spill to disk to free up execution memory.

Configuration Properties

spark.shuffle.file.buffer

ShuffleExternalSorter uses spark.shuffle.file.buffer configuration property for...FIXME

spark.shuffle.spill.numElementsForceSpillThreshold

ShuffleExternalSorter uses spark.shuffle.spill.numElementsForceSpillThreshold configuration property for...FIXME

Creating Instance

ShuffleExternalSorter takes the following to be created:

ShuffleExternalSorter is created when:

ShuffleInMemorySorter

ShuffleExternalSorter manages a ShuffleInMemorySorter:

  • ShuffleInMemorySorter is created immediately when ShuffleExternalSorter is

  • ShuffleInMemorySorter is requested to free up memory and dereferenced (nulled) when ShuffleExternalSorter is requested to cleanupResources and closeAndGetSpills

ShuffleExternalSorter uses the ShuffleInMemorySorter for the following:

Spilling To Disk

long spill(
  long size,
  MemoryConsumer trigger)

spill is part of the MemoryConsumer abstraction.


spill returns the memory bytes spilled (spill size).

spill prints out the following INFO message to the logs:

Thread [threadId] spilling sort data of [memoryUsage] to disk ([spillsSize] [time|times] so far)

spill writeSortedFile (with the isLastFile flag disabled).

spill frees up execution memory (and records the memory bytes spilled as spillSize).

spill requests the ShuffleInMemorySorter to reset.

In the end, spill requests the TaskContext for TaskMetrics to increase the memory bytes spilled.

closeAndGetSpills

SpillInfo[] closeAndGetSpills()

closeAndGetSpills...FIXME

closeAndGetSpills is used when UnsafeShuffleWriter is requested to closeAndWriteOutput.

getMemoryUsage

long getMemoryUsage()

getMemoryUsage...FIXME

getMemoryUsage is used when ShuffleExternalSorter is created and requested to spill and updatePeakMemoryUsed.

updatePeakMemoryUsed

void updatePeakMemoryUsed()

updatePeakMemoryUsed...FIXME

updatePeakMemoryUsed is used when ShuffleExternalSorter is requested to getPeakMemoryUsedBytes and freeMemory.

writeSortedFile

void writeSortedFile(
  boolean isLastFile)

writeSortedFile...FIXME


writeSortedFile is used when:

cleanupResources

void cleanupResources()

cleanupResources...FIXME

cleanupResources is used when UnsafeShuffleWriter is requested to write records and stop.

Inserting Serialized Record Into ShuffleInMemorySorter

void insertRecord(
  Object recordBase,
  long recordOffset,
  int length,
  int partitionId)

insertRecord...FIXME

insertRecord growPointerArrayIfNecessary.

insertRecord...FIXME

insertRecord acquireNewPageIfNecessary.

insertRecord...FIXME

insertRecord is used when UnsafeShuffleWriter is requested to insertRecordIntoSorter

growPointerArrayIfNecessary

void growPointerArrayIfNecessary()

growPointerArrayIfNecessary...FIXME

acquireNewPageIfNecessary

void acquireNewPageIfNecessary(
  int required)

acquireNewPageIfNecessary...FIXME

freeMemory

long freeMemory()

freeMemory...FIXME

freeMemory is used when ShuffleExternalSorter is requested to spill, cleanupResources, and closeAndGetSpills.

Peak Memory Used

long getPeakMemoryUsedBytes()

getPeakMemoryUsedBytes...FIXME

getPeakMemoryUsedBytes is used when UnsafeShuffleWriter is requested to updatePeakMemoryUsed.

Logging

Enable ALL logging level for org.apache.spark.shuffle.sort.ShuffleExternalSorter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.ShuffleExternalSorter=ALL

Refer to Logging.