BypassMergeSortShuffleWriter¶
BypassMergeSortShuffleWriter<K, V>
is a ShuffleWriter for ShuffleMapTasks to write records into one single shuffle block data file.
Creating Instance¶
BypassMergeSortShuffleWriter
takes the following to be created:
- BlockManager
- BypassMergeSortShuffleHandle (of
K
keys andV
values) - Map ID
- SparkConf
- ShuffleWriteMetricsReporter
-
ShuffleExecutorComponents
BypassMergeSortShuffleWriter
is created when:
SortShuffleManager
is requested for a ShuffleWriter (for a BypassMergeSortShuffleHandle)
DiskBlockObjectWriters¶
DiskBlockObjectWriter[] partitionWriters
BypassMergeSortShuffleWriter
uses a DiskBlockObjectWriter per partition (based on the Partitioner).
BypassMergeSortShuffleWriter
asserts that no partitionWriters
are created while writing out records to a shuffle file.
While writing, BypassMergeSortShuffleWriter
requests the BlockManager for as many DiskBlockObjectWriters as there are partitions (in the Partitioner).
While writing, BypassMergeSortShuffleWriter
requests the Partitioner for a partition for records (using keys) and finds the per-partition DiskBlockObjectWriter
that is requested to write out the partition records. After all records are written out to their shuffle files, the DiskBlockObjectWriter
s are requested to commitAndGet.
BypassMergeSortShuffleWriter
uses the partition writers while writing out partition data and removes references to them (null
ify them) in the end.
In other words, after writing out partition data partitionWriters
internal registry is null
.
partitionWriters
internal registry becomes null
after BypassMergeSortShuffleWriter
has finished:
IndexShuffleBlockResolver¶
BypassMergeSortShuffleWriter
is given a IndexShuffleBlockResolver when created.
BypassMergeSortShuffleWriter
uses the IndexShuffleBlockResolver
for writing out records (to writeIndexFileAndCommit and getDataFile).
Serializer¶
When created, BypassMergeSortShuffleWriter
requests the ShuffleDependency (of the given BypassMergeSortShuffleHandle) for the Serializer.
BypassMergeSortShuffleWriter
creates a new instance of the Serializer
for writing out records.
Configuration Properties¶
spark.shuffle.file.buffer¶
BypassMergeSortShuffleWriter
uses spark.shuffle.file.buffer configuration property for...FIXME
spark.file.transferTo¶
BypassMergeSortShuffleWriter uses spark.file.transferTo configuration property to control whether to use Java New I/O while writing to a partitioned file.
Writing Out Records to Shuffle File¶
void write(
Iterator<Product2<K, V>> records)
write
is part of the ShuffleWriter abstraction.
write
creates a new instance of the Serializer.
write
initializes the partitionWriters and partitionWriterSegments internal registries (for DiskBlockObjectWriters and FileSegments for every partition, respectively).
write
requests the BlockManager for the DiskBlockManager and for every partition write
requests it for a shuffle block ID and the file. write
creates a DiskBlockObjectWriter for the shuffle block (using the BlockManager). write
stores the reference to DiskBlockObjectWriters
in the partitionWriters internal registry.
After all DiskBlockObjectWriters
are created, write
requests the ShuffleWriteMetrics to increment shuffle write time metric.
For every record (a key-value pair), write requests the Partitioner for the partition ID for the key. The partition ID is then used as an index of the partition writer (among the DiskBlockObjectWriters) to write the current record out to a block file.
Once all records have been writted out to their respective block files, write does the following for every DiskBlockObjectWriter:
-
Requests the
DiskBlockObjectWriter
to commit and return a corresponding FileSegment of the shuffle block -
Saves the (reference to)
FileSegments
in the partitionWriterSegments internal registry -
Requests the
DiskBlockObjectWriter
to close
Note
At this point, all the records are in shuffle block files on a local disk. The records are split across block files by key.
write
requests the IndexShuffleBlockResolver for the shuffle file for the shuffle and the mapDs>>.
write
creates a temporary file (based on the name of the shuffle file) and writes all the per-partition shuffle files to it. The size of every per-partition shuffle files is saved as the partitionLengths internal registry.
Note
At this point, all the per-partition shuffle block files are one single map shuffle data file.
write
requests the IndexShuffleBlockResolver to write shuffle index and data files for the shuffle and the map IDs (with the partitionLengths and the temporary shuffle output file).
write
returns a shuffle map output status (with the shuffle server ID and the partitionLengths).
No Records¶
When there is no records to write out, write
initializes the partitionLengths internal array (of numPartitions size) with all elements being 0.
write
requests the IndexShuffleBlockResolver to write shuffle index and data files, but the difference (compared to when there are records to write) is that the dataTmp
argument is simply null
.
write
sets the internal mapStatus
(with the address of BlockManager in use and partitionLengths).
Requirements¶
write
requires that there are no DiskBlockObjectWriters.
Writing Out Partitioned Data¶
long[] writePartitionedData(
ShuffleMapOutputWriter mapOutputWriter)
writePartitionedData
makes sure that DiskBlockObjectWriters are available (partitionWriters != null
).
For every partition, writePartitionedData
takes the partition file (from the FileSegments). Only when the partition file exists, writePartitionedData
requests the given ShuffleMapOutputWriter for a ShufflePartitionWriter and writes out the partitioned data. At the end, writePartitionedData
deletes the file.
writePartitionedData
requests the ShuffleWriteMetricsReporter to increment the write time.
In the end, writePartitionedData
requests the ShuffleMapOutputWriter
to commitAllPartitions and returns the size of each partition of the output map file.
Copying Raw Bytes Between Input Streams¶
copyStream(
in: InputStream,
out: OutputStream,
closeStreams: Boolean = false,
transferToEnabled: Boolean = false): Long
copyStream branches off depending on the type of in
and out
streams, i.e. whether they are both FileInputStream
with transferToEnabled
input flag is enabled.
If they are both FileInputStream
with transferToEnabled
enabled, copyStream gets their FileChannels
and transfers bytes from the input file to the output file and counts the number of bytes, possibly zero, that were actually transferred.
NOTE: copyStream uses Java's {java-javadoc-url}/java/nio/channels/FileChannel.html[java.nio.channels.FileChannel] to manage file channels.
If either in
and out
input streams are not FileInputStream
or transferToEnabled
flag is disabled (default), copyStream reads data from in
to write to out
and counts the number of bytes written.
copyStream can optionally close in
and out
streams (depending on the input closeStreams
-- disabled by default).
NOTE: Utils.copyStream
is used when <
Tip
Visit the official web site of JSR 51: New I/O APIs for the Java Platform and read up on java.nio package.
Stopping ShuffleWriter¶
Option<MapStatus> stop(
boolean success)
stop
...FIXME
stop
is part of the ShuffleWriter abstraction.
Temporary Array of Partition Lengths¶
long[] partitionLengths
Temporary array of partition lengths after records are written to a shuffle system.
Initialized every time BypassMergeSortShuffleWriter
writes out records (before passing it in to IndexShuffleBlockResolver). After IndexShuffleBlockResolver
finishes, it is used to initialize mapStatus internal property.
Logging¶
Enable ALL
logging level for org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter=ALL
Refer to Logging.
Internal Properties¶
numPartitions¶
partitionWriterSegments¶
mapStatus¶
MapStatus that BypassMergeSortShuffleWriter returns when stopped
Initialized every time BypassMergeSortShuffleWriter
writes out records.
Used when BypassMergeSortShuffleWriter stops (with success
enabled) as a marker if any records were written and returned if they did.