CompactibleFileStreamLog¶
CompactibleFileStreamLog
is an extension of the HDFSMetadataLog abstraction for metadata logs that can compact logs at regular intervals.
Creating Instance¶
CompactibleFileStreamLog
takes the following to be created:
- Version of the Metadata Log
-
SparkSession
- Path of the Metadata Log
Abstract Class
CompactibleFileStreamLog
is an abstract class and cannot be created directly. It is created indirectly for the concrete CompactibleFileStreamLogs.
Contract¶
Filtering Out Obsolete Logs¶
compactLogs(
logs: Seq[T]): Seq[T]
Used when storing metadata and for all files (except deleted)
Important
compactLogs
does nothing important in the available implementations. Consider this method a noop.
Default Compact Interval¶
defaultCompactInterval: Int
Used for the compact interval
File Cleanup Delay¶
fileCleanupDelayMs: Long
Used for delete expired log entries
isDeletingExpiredLog¶
isDeletingExpiredLog: Boolean
Used to store metadata
Implementations¶
Compaction¶
compact(
batchId: Long,
logs: Array[T]): Boolean
compact
finds valid metadata files for compaction (for the given compaction batchId
and compact interval) and makes sure that they are all available. compact
tracks elapsed time (loadElapsedMs
).
compact
filters out obsolete logs among the valid metadata files and the input logs
(which actually does nothing important given the note in compactLogs).
compact
stores the metadata (the filtered metadata files and the input logs
) for the input batchId
. compact
tracks elapsed time (writeElapsedMs
).
compact
prints out the following DEBUG message (only when the total elapsed time of loadElapsedMs
and writeElapsedMs
are below the unconfigurable 2000
ms):
Compacting took [elapsedMs] ms (load: [loadElapsedMs] ms, write: [writeElapsedMs] ms) for compact batch [batchId]
In case the total epased time is above the unconfigurable 2000
ms, compact
prints out the following WARN messages:
Compacting took [elapsedMs] ms (load: [loadElapsedMs] ms, write: [writeElapsedMs] ms) for compact batch [batchId]
Loaded [allLogs] entries (estimated [allLogs] bytes in memory), and wrote [compactedLogs] entries for compact batch [batchId]
compact
throws an IllegalStateException
when one of the metadata files to compact is not valid (not accessible on a file system or of incorrect format):
[batchIdToPath] doesn't exist when compacting batch [batchId] (compactInterval: [compactInterval])
compact
is used while storing metadata for streaming batch.
spark.sql.streaming.fileSink.log.cleanupDelay¶
CompactibleFileStreamLog
uses spark.sql.streaming.fileSink.log.cleanupDelay configuration property to delete expired log entries.
compact File Suffix¶
CompactibleFileStreamLog
uses .compact file suffix for batchIdToPath, getBatchIdFromFileName, and the compactInterval.
Storing Metadata for Streaming Batch¶
add(
batchId: Long,
logs: Array[T]): Boolean
add
checks whether the given batchId
is compaction batch or not (alongside compact interval).
add
...FIXME
add
is part of the MetadataLog abstraction.
Deleting Expired Log Entries¶
deleteExpiredLog(
currentBatchId: Long): Unit
deleteExpiredLog
...FIXME
deleteExpiredLog
does nothing and simply returns when the current batch ID incremented (currentBatchId + 1
) is below the compact interval plus the minBatchesToRetain.
Compact Interval¶
compactInterval: Int
compactInterval
is the number of metadata log files between compactions.
Lazy Value
compactInterval
is a Scala lazy value which means that the code to initialize it is executed once only (when accessed for the first time) and cached afterwards.
compactInterval
finds compacted IDs and determines the compact interval.
compactInterval
requests the CheckpointFileManager for the files in the metadataPath that are batch (and possibly compacted). compactInterval
takes the compacted files only (if available), converts them to batch IDs and sorts in descending order.
compactInterval
starts with the default compact interval.
- If there are two compacted IDs, their difference is the compact interval
- If there is one compacted ID only,
compactInterval
"derives" the compact interval (FIXME)
compactInterval
asserts that the compact interval is a positive value or throws an AssertionError
.
compactInterval
prints out the following INFO message to the logs (with the defaultCompactInterval):
Set the compact interval to [interval] [defaultCompactInterval: [defaultCompactInterval]]
All Files (Except Deleted)¶
allFiles(): Array[T]
allFiles
...FIXME
allFiles
is used when:
Converting Batch Id to Hadoop Path¶
batchIdToPath(
batchId: Long): Path
batchIdToPath
...FIXME
batchIdToPath
is part of the HDFSMetadataLog abstraction.
Converting Hadoop Path to Batch Id¶
pathToBatchId(
path: Path): Long
pathToBatchId
...FIXME
pathToBatchId
is part of the HDFSMetadataLog abstraction.
isBatchFile¶
isBatchFile(
path: Path): Boolean
isBatchFile
is true
when successful to get the batchId for the given path. Otherwise is false
.
isBatchFile
is part of the HDFSMetadataLog abstraction.
Serializing Metadata (Writing Metadata in Serialized Format)¶
serialize(
logData: Array[T],
out: OutputStream): Unit
serialize
writes the version header (v
and the <UTF_8
).
serialize
then writes the log data (serialized using Json4s (with Jackson binding) library). Entries are separated by new lines.
serialize
is part of the HDFSMetadataLog abstraction.
Deserializing Metadata¶
deserialize(
in: InputStream): Array[T]
deserialize
...FIXME
deserialize
is part of the HDFSMetadataLog abstraction.
Utilities¶
getBatchIdFromFileName¶
getBatchIdFromFileName(
fileName: String): Long
getBatchIdFromFileName
simply removes the .compact suffix from the given fileName
and converts the remaining part to a number.
getBatchIdFromFileName
is used for pathToBatchId, isBatchFile, and delete expired log entries.
getValidBatchesBeforeCompactionBatch¶
getValidBatchesBeforeCompactionBatch(
compactionBatchId: Long,
compactInterval: Int): Seq[Long]
getValidBatchesBeforeCompactionBatch
...FIXME
getBatchIdFromFileName
is used for compaction.