CompactibleFileStreamLog¶
CompactibleFileStreamLog is an extension of the HDFSMetadataLog abstraction for metadata logs that can compact logs at regular intervals.
Creating Instance¶
CompactibleFileStreamLog takes the following to be created:
- Version of the Metadata Log
 -  
SparkSession - Path of the Metadata Log
 
Abstract Class
CompactibleFileStreamLog is an abstract class and cannot be created directly. It is created indirectly for the concrete CompactibleFileStreamLogs.
Contract¶
Filtering Out Obsolete Logs¶
compactLogs(
  logs: Seq[T]): Seq[T]
Used when storing metadata and for all files (except deleted)
Important
compactLogs does nothing important in the available implementations. Consider this method a noop.
Default Compact Interval¶
defaultCompactInterval: Int
Used for the compact interval
File Cleanup Delay¶
fileCleanupDelayMs: Long
Used for delete expired log entries
isDeletingExpiredLog¶
isDeletingExpiredLog: Boolean
Used to store metadata
Implementations¶
Compaction¶
compact(
  batchId: Long,
  logs: Array[T]): Boolean
compact finds valid metadata files for compaction (for the given compaction batchId and compact interval) and makes sure that they are all available. compact tracks elapsed time (loadElapsedMs).
compact filters out obsolete logs among the valid metadata files and the input logs (which actually does nothing important given the note in compactLogs).
compact stores the metadata (the filtered metadata files and the input logs) for the input batchId. compact tracks elapsed time (writeElapsedMs).
compact prints out the following DEBUG message (only when the total elapsed time of loadElapsedMs and writeElapsedMs are below the unconfigurable 2000 ms):
Compacting took [elapsedMs] ms (load: [loadElapsedMs] ms, write: [writeElapsedMs] ms) for compact batch [batchId]
In case the total epased time is above the unconfigurable 2000 ms, compact prints out the following WARN messages:
Compacting took [elapsedMs] ms (load: [loadElapsedMs] ms, write: [writeElapsedMs] ms) for compact batch [batchId]
Loaded [allLogs] entries (estimated [allLogs] bytes in memory), and wrote [compactedLogs] entries for compact batch [batchId]
compact throws an IllegalStateException when one of the metadata files to compact is not valid (not accessible on a file system or of incorrect format):
[batchIdToPath] doesn't exist when compacting batch [batchId] (compactInterval: [compactInterval])
compact is used while storing metadata for streaming batch.
spark.sql.streaming.fileSink.log.cleanupDelay¶
CompactibleFileStreamLog uses spark.sql.streaming.fileSink.log.cleanupDelay configuration property to delete expired log entries.
compact File Suffix¶
CompactibleFileStreamLog uses .compact file suffix for batchIdToPath, getBatchIdFromFileName, and the compactInterval.
Storing Metadata for Streaming Batch¶
add(
  batchId: Long,
  logs: Array[T]): Boolean
add checks whether the given batchId is compaction batch or not (alongside compact interval).
add...FIXME
add is part of the MetadataLog abstraction.
Deleting Expired Log Entries¶
deleteExpiredLog(
  currentBatchId: Long): Unit
deleteExpiredLog...FIXME
deleteExpiredLog does nothing and simply returns when the current batch ID incremented (currentBatchId + 1) is below the compact interval plus the minBatchesToRetain.
Compact Interval¶
compactInterval: Int
compactInterval is the number of metadata log files between compactions.
Lazy Value
compactInterval is a Scala lazy value which means that the code to initialize it is executed once only (when accessed for the first time) and cached afterwards.
compactInterval finds compacted IDs and determines the compact interval.
compactInterval requests the CheckpointFileManager for the files in the metadataPath that are batch (and possibly compacted). compactInterval takes the compacted files only (if available), converts them to batch IDs and sorts in descending order.
compactInterval starts with the default compact interval.
- If there are two compacted IDs, their difference is the compact interval
 - If there is one compacted ID only, 
compactInterval"derives" the compact interval (FIXME) 
compactInterval asserts that the compact interval is a positive value or throws an AssertionError.
compactInterval prints out the following INFO message to the logs (with the defaultCompactInterval):
Set the compact interval to [interval] [defaultCompactInterval: [defaultCompactInterval]]
All Files (Except Deleted)¶
allFiles(): Array[T]
allFiles...FIXME
allFiles is used when:
Converting Batch Id to Hadoop Path¶
batchIdToPath(
  batchId: Long): Path
batchIdToPath...FIXME
batchIdToPath is part of the HDFSMetadataLog abstraction.
Converting Hadoop Path to Batch Id¶
pathToBatchId(
  path: Path): Long
pathToBatchId...FIXME
pathToBatchId is part of the HDFSMetadataLog abstraction.
isBatchFile¶
isBatchFile(
  path: Path): Boolean
isBatchFile is true when successful to get the batchId for the given path. Otherwise is false.
isBatchFile is part of the HDFSMetadataLog abstraction.
Serializing Metadata (Writing Metadata in Serialized Format)¶
serialize(
  logData: Array[T],
  out: OutputStream): Unit
serialize writes the version header (v and the <UTF_8).
serialize then writes the log data (serialized using Json4s (with Jackson binding) library). Entries are separated by new lines.
serialize is part of the HDFSMetadataLog abstraction.
Deserializing Metadata¶
deserialize(
  in: InputStream): Array[T]
deserialize...FIXME
deserialize is part of the HDFSMetadataLog abstraction.
Utilities¶
getBatchIdFromFileName¶
getBatchIdFromFileName(
  fileName: String): Long
getBatchIdFromFileName simply removes the .compact suffix from the given fileName and converts the remaining part to a number.
getBatchIdFromFileName is used for pathToBatchId, isBatchFile, and delete expired log entries.
getValidBatchesBeforeCompactionBatch¶
getValidBatchesBeforeCompactionBatch(
  compactionBatchId: Long,
  compactInterval: Int): Seq[Long]
getValidBatchesBeforeCompactionBatch...FIXME
getBatchIdFromFileName is used for compaction.