ShuffleExternalSorter

ShuffleExternalSorter is a specialized cache-efficient sorter that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, ShuffleExternalSorter can fit more of the array into cache.

ShuffleExternalSorter
Figure 1. ShuffleExternalSorter and UnsafeShuffleWriter

Creating Instance

ShuffleExternalSorter takes the following to be created:

ShuffleExternalSorter uses spark.shuffle.file.buffer (for fileBufferSizeBytes) and spark.shuffle.spill.numElementsForceSpillThreshold (for numElementsForSpillThreshold) Spark properties.

ShuffleExternalSorter creates a ShuffleInMemorySorter (with spark.shuffle.sort.useRadixSort Spark property enabled by default).

ShuffleExternalSorter is created for UnsafeShuffleWriter.

ShuffleInMemorySorter

ShuffleExternalSorter manages a ShuffleInMemorySorter:

  • ShuffleInMemorySorter is created immediately when ShuffleExternalSorter is

  • ShuffleInMemorySorter is requested to free up memory and dereferenced (nulled) when ShuffleExternalSorter is requested to cleanupResources and closeAndGetSpills

ShuffleExternalSorter uses the ShuffleInMemorySorter when requested for the following:

ShuffleExternalSorter as MemoryConsumer

ShuffleExternalSorter is a MemoryConsumer that can spill to disk to free up execution memory.

Page Size

ShuffleExternalSorter uses the page size to be the minimum of PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES and pageSizeBytes, and Tungsten memory mode).

allocatedPages

ShuffleExternalSorter uses…​FIXME

getMemoryUsage Internal Method

long getMemoryUsage()

getMemoryUsage…​FIXME

getMemoryUsage is used when…​FIXME

writeSortedFile Method

void writeSortedFile(
  boolean isLastFile)

writeSortedFile…​FIXME

writeSortedFile is used when ShuffleExternalSorter is requested to spill and closeAndGetSpills.

cleanupResources Internal Method

void cleanupResources()

cleanupResources…​FIXME

cleanupResources is used when…​FIXME

Spilling To Disk

long spill(
  long size,
  MemoryConsumer trigger)

spill prints out the following INFO message to the logs:

Thread [threadId] spilling sort data of [memoryUsage] to disk ([spillsSize] [time|times] so far)

spill writeSortedFile (with the isLastFile flag disabled).

spill frees execution memory (and records the memory bytes spilled as spillSize).

spill then requests the ShuffleInMemorySorter to reset followed by requesting the TaskMetrics (of the TaskContext) to increase the memory bytes spilled.

In the end, spill returns the memory bytes spilled (spill size).

spill returns 0 when one of the following holds:

spill is part of the MemoryConsumer contract.

growPointerArrayIfNecessary Method

void growPointerArrayIfNecessary()

growPointerArrayIfNecessary…​FIXME

growPointerArrayIfNecessary is used when…​FIXME

closeAndGetSpills Method

SpillInfo[] closeAndGetSpills()

closeAndGetSpills…​FIXME

closeAndGetSpills is used when…​FIXME

Inserting Serialized Record Into ShuffleInMemorySorter

void insertRecord(
  Object recordBase,
  long recordOffset,
  int length,
  int partitionId)

insertRecord requires that the ShuffleInMemorySorter is available.

insertRecord…​FIXME

insertRecord is used when…​FIXME

freeMemory Method

long freeMemory()

freeMemory…​FIXME

freeMemory is used when…​FIXME

getPeakMemoryUsedBytes Method

long getPeakMemoryUsedBytes()

getPeakMemoryUsedBytes…​FIXME

getPeakMemoryUsedBytes is used when…​FIXME

Logging

Enable ALL logging levels for org.apache.spark.shuffle.sort.ShuffleExternalSorter logger to see what happens in ShuffleExternalSorter.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.ShuffleExternalSorter=ALL

Refer to Logging.