ShuffleExternalSorter¶
ShuffleExternalSorter
is a specialized cache-efficient sorter that sorts arrays of compressed record pointers and partition ids.
ShuffleExternalSorter
uses only 8 bytes of space per record in the sorting array to fit more of the array into cache.
ShuffleExternalSorter
is created and used by UnsafeShuffleWriter only.
MemoryConsumer¶
ShuffleExternalSorter
is a MemoryConsumer with page size of 128 MB (unless TaskMemoryManager uses smaller).
ShuffleExternalSorter
can spill to disk to free up execution memory.
Configuration Properties¶
spark.shuffle.file.buffer¶
ShuffleExternalSorter
uses spark.shuffle.file.buffer configuration property for...FIXME
spark.shuffle.spill.numElementsForceSpillThreshold¶
ShuffleExternalSorter
uses spark.shuffle.spill.numElementsForceSpillThreshold configuration property for...FIXME
Creating Instance¶
ShuffleExternalSorter
takes the following to be created:
- TaskMemoryManager
- BlockManager
- TaskContext
- Initial Size
- Number of Partitions
- SparkConf
- ShuffleWriteMetricsReporter
ShuffleExternalSorter
is created when:
UnsafeShuffleWriter
is requested to open a ShuffleExternalSorter
ShuffleInMemorySorter¶
ShuffleExternalSorter
manages a ShuffleInMemorySorter:
-
ShuffleInMemorySorter
is created immediately whenShuffleExternalSorter
is -
ShuffleInMemorySorter
is requested to free up memory and dereferenced (null
ed) whenShuffleExternalSorter
is requested to cleanupResources and closeAndGetSpills
ShuffleExternalSorter
uses the ShuffleInMemorySorter
for the following:
Spilling To Disk¶
long spill(
long size,
MemoryConsumer trigger)
spill
is part of the MemoryConsumer abstraction.
spill
returns the memory bytes spilled (spill size).
spill
prints out the following INFO message to the logs:
Thread [threadId] spilling sort data of [memoryUsage] to disk ([spillsSize] [time|times] so far)
spill
writeSortedFile (with the isLastFile
flag disabled).
spill
frees up execution memory (and records the memory bytes spilled as spillSize
).
spill
requests the ShuffleInMemorySorter to reset.
In the end, spill
requests the TaskContext for TaskMetrics to increase the memory bytes spilled.
closeAndGetSpills¶
SpillInfo[] closeAndGetSpills()
closeAndGetSpills
...FIXME
closeAndGetSpills
is used when UnsafeShuffleWriter
is requested to closeAndWriteOutput.
getMemoryUsage¶
long getMemoryUsage()
getMemoryUsage
...FIXME
getMemoryUsage
is used when ShuffleExternalSorter
is created and requested to spill and updatePeakMemoryUsed.
updatePeakMemoryUsed¶
void updatePeakMemoryUsed()
updatePeakMemoryUsed
...FIXME
updatePeakMemoryUsed
is used when ShuffleExternalSorter
is requested to getPeakMemoryUsedBytes and freeMemory.
writeSortedFile¶
void writeSortedFile(
boolean isLastFile)
writeSortedFile
...FIXME
writeSortedFile
is used when:
ShuffleExternalSorter
is requested to spill and closeAndGetSpills
cleanupResources¶
void cleanupResources()
cleanupResources
...FIXME
cleanupResources
is used when UnsafeShuffleWriter
is requested to write records and stop.
Inserting Serialized Record Into ShuffleInMemorySorter¶
void insertRecord(
Object recordBase,
long recordOffset,
int length,
int partitionId)
insertRecord
...FIXME
insertRecord
growPointerArrayIfNecessary.
insertRecord
...FIXME
insertRecord
acquireNewPageIfNecessary.
insertRecord
...FIXME
insertRecord
is used when UnsafeShuffleWriter
is requested to insertRecordIntoSorter
growPointerArrayIfNecessary¶
void growPointerArrayIfNecessary()
growPointerArrayIfNecessary
...FIXME
acquireNewPageIfNecessary¶
void acquireNewPageIfNecessary(
int required)
acquireNewPageIfNecessary
...FIXME
freeMemory¶
long freeMemory()
freeMemory
...FIXME
freeMemory
is used when ShuffleExternalSorter
is requested to spill, cleanupResources, and closeAndGetSpills.
Peak Memory Used¶
long getPeakMemoryUsedBytes()
getPeakMemoryUsedBytes
...FIXME
getPeakMemoryUsedBytes
is used when UnsafeShuffleWriter
is requested to updatePeakMemoryUsed.
Logging¶
Enable ALL
logging level for org.apache.spark.shuffle.sort.ShuffleExternalSorter
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.shuffle.sort.ShuffleExternalSorter=ALL
Refer to Logging.