DiskBlockObjectWriter¶
DiskBlockObjectWriter is a disk writer of BlockManager.
DiskBlockObjectWriter is an OutputStream (Java) that BlockManager offers for writing data blocks to disk.
DiskBlockObjectWriter is used when:
-
BypassMergeSortShuffleWriteris requested for partition writers -
UnsafeSorterSpillWriteris requested for a partition writer -
ShuffleExternalSorteris requested to writeSortedFile -
ExternalSorteris requested to spillMemoryIteratorToDisk
Creating Instance¶
DiskBlockObjectWriter takes the following to be created:
-
File(Java) - SerializerManager
- SerializerInstance
- Buffer size
-
syncWritesflag (based on spark.shuffle.sync configuration property) - ShuffleWriteMetricsReporter
- BlockId (default:
null)
DiskBlockObjectWriter is created when:
BlockManageris requested for a disk writer
Buffer Size¶
DiskBlockObjectWriter is given a buffer size when created.
The buffer size is specified by BlockManager and is based on spark.shuffle.file.buffer configuration property (in most cases) or is hardcoded to 32k (in some cases but is in fact the default value).
The buffer size is exactly the buffer size of the BufferedOutputStream.
SerializationStream¶
DiskBlockObjectWriter manages a SerializationStream for writing a key-value record:
-
Opens it when requested to open
-
Closes it when requested to commitAndGet
-
Dereferences it (
nulls it) when closeResources
States¶
DiskBlockObjectWriter can be in one of the following states (that match the state of the underlying output streams):
- Initialized
- Open
- Closed
Writing Out Record¶
write(
key: Any,
value: Any): Unit
write opens the underlying stream unless open already.
write requests the SerializationStream to write the key and then the value.
In the end, write updates the write metrics.
write is used when:
-
BypassMergeSortShuffleWriteris requested to write records of a partition -
ExternalAppendOnlyMapis requested to spillMemoryIteratorToDisk -
ExternalSorteris requested to write all records into a partitioned fileSpillableIteratoris requested tospill
-
WritablePartitionedPairCollectionis requested for adestructiveSortedWritablePartitionedIterator
commitAndGet¶
commitAndGet(): FileSegment
With streamOpen enabled, commitAndGet...FIXME
Otherwise, commitAndGet returns a new FileSegment (with the File, committedPosition and 0 length).
commitAndGet is used when:
BypassMergeSortShuffleWriteris requested to writeShuffleExternalSorteris requested to writeSortedFileDiskBlockObjectWriteris requested to closeExternalAppendOnlyMapis requested to spillMemoryIteratorToDiskExternalSorteris requested to spillMemoryIteratorToDisk, writePartitionedFileUnsafeSorterSpillWriteris requested to close
Committing Writes and Closing Resources¶
close(): Unit
Only if initialized, close commitAndGet followed by closeResources. Otherwise, close does nothing.
close is used when:
- FIXME
revertPartialWritesAndClose¶
revertPartialWritesAndClose(): File
revertPartialWritesAndClose...FIXME
revertPartialWritesAndClose is used when...FIXME
Writing Bytes (From Byte Array Starting From Offset)¶
write(
kvBytes: Array[Byte],
offs: Int,
len: Int): Unit
write...FIXME
write is used when...FIXME
Opening DiskBlockObjectWriter¶
open(): DiskBlockObjectWriter
open opens the DiskBlockObjectWriter, i.e. initializes and re-sets bs and objOut internal output streams.
Internally, open makes sure that DiskBlockObjectWriter is not closed (hasBeenClosed flag is disabled). If it was, open throws a IllegalStateException:
Writer already closed. Cannot be reopened.
Unless DiskBlockObjectWriter has already been initialized (initialized flag is enabled), open initializes it (and turns initialized flag on).
Regardless of whether DiskBlockObjectWriter was already initialized or not, open requests SerializerManager to wrap mcs output stream for encryption and compression (for blockId) and sets it as bs.
open requests the SerializerInstance to serialize bs output stream and sets it as objOut.
Note
open uses the SerializerInstance that was used to create the DiskBlockObjectWriter.
In the end, open turns streamOpen flag on.
open is used when DiskBlockObjectWriter writes out a record or bytes from a specified byte array and the stream is not open yet.
Initialization¶
initialize(): Unit
initialize creates a FileOutputStream to write to the file (with theappend enabled) and takes the FileChannel associated with this file output stream.
initialize creates a TimeTrackingOutputStream (with the ShuffleWriteMetricsReporter and the FileOutputStream).
With checksumEnabled, initialize...FIXME
In the end, initialize creates a BufferedOutputStream.
checksumEnabled Flag¶
DiskBlockObjectWriter defines checksumEnabled flag to...FIXME
checksumEnabled is false by default and can be enabled using setChecksum.
setChecksum¶
setChecksum(
checksum: Checksum): Unit
setChecksum...FIXME
setChecksum is used when:
BypassMergeSortShuffleWriteris requested to write records (with spark.shuffle.checksum.enabled enabled)ShuffleExternalSorteris requested to writeSortedFile (with spark.shuffle.checksum.enabled enabled)
Recording Bytes Written¶
recordWritten(): Unit
recordWritten increases the numRecordsWritten counter.
recordWritten requests the ShuffleWriteMetricsReporter to incRecordsWritten.
recordWritten updates the bytes written metric every 16384 bytes written (based on the numRecordsWritten counter).
recordWritten is used when:
ShuffleExternalSorteris requested to writeSortedFileDiskBlockObjectWriteris requested to writeUnsafeSorterSpillWriteris requested to write
Updating Bytes Written Metric¶
updateBytesWritten(): Unit
updateBytesWritten requests the FileChannel for the file position (i.e., the number of bytes from the beginning of the file to the current position) that is used to incBytesWritten (using the ShuffleWriteMetricsReporter and the reportedPosition counter).
In the end, updateBytesWritten updates the reportedPosition counter to the current file position (so it can report incBytesWritten properly).
BufferedOutputStream¶
mcs: ManualCloseOutputStream
DiskBlockObjectWriter creates a custom BufferedOutputStream (Java) when requested to initialize.
The BufferedOutputStream is closed (and dereferenced) in closeResources.
The BufferedOutputStream is used to create the OutputStream when requested to open.
OutputStream¶
bs: OutputStream
DiskBlockObjectWriter creates an OutputStream when requested to open. The OutputStream can be encrypted and compressed if enabled.
The OutputStream is closed (and dereferenced) in closeResources.
The OutputStream is used to create the SerializationStream when requested to open.
The OutputStream is requested for the following:
- Write bytes out in write
- Flush in flush (and commitAndGet)