Skip to content

CompactibleFileStreamLog

CompactibleFileStreamLog is an extension of the HDFSMetadataLog abstraction for metadata logs that can compact logs at regular intervals.

Creating Instance

CompactibleFileStreamLog takes the following to be created:

  • Version of the Metadata Log
  • SparkSession
  • Path of the Metadata Log
Abstract Class

CompactibleFileStreamLog is an abstract class and cannot be created directly. It is created indirectly for the concrete CompactibleFileStreamLogs.

Contract

Filtering Out Obsolete Logs

compactLogs(
  logs: Seq[T]): Seq[T]

Used when storing metadata and for all files (except deleted)

Important

compactLogs does nothing important in the available implementations. Consider this method a noop.

Default Compact Interval

defaultCompactInterval: Int

Used for the compact interval

File Cleanup Delay

fileCleanupDelayMs: Long

Used for delete expired log entries

isDeletingExpiredLog

isDeletingExpiredLog: Boolean

Used to store metadata

Implementations

Compaction

compact(
  batchId: Long,
  logs: Array[T]): Boolean

compact finds valid metadata files for compaction (for the given compaction batchId and compact interval) and makes sure that they are all available. compact tracks elapsed time (loadElapsedMs).

compact filters out obsolete logs among the valid metadata files and the input logs (which actually does nothing important given the note in compactLogs).

compact stores the metadata (the filtered metadata files and the input logs) for the input batchId. compact tracks elapsed time (writeElapsedMs).

compact prints out the following DEBUG message (only when the total elapsed time of loadElapsedMs and writeElapsedMs are below the unconfigurable 2000 ms):

Compacting took [elapsedMs] ms (load: [loadElapsedMs] ms, write: [writeElapsedMs] ms) for compact batch [batchId]

In case the total epased time is above the unconfigurable 2000 ms, compact prints out the following WARN messages:

Compacting took [elapsedMs] ms (load: [loadElapsedMs] ms, write: [writeElapsedMs] ms) for compact batch [batchId]
Loaded [allLogs] entries (estimated [allLogs] bytes in memory), and wrote [compactedLogs] entries for compact batch [batchId]

compact throws an IllegalStateException when one of the metadata files to compact is not valid (not accessible on a file system or of incorrect format):

[batchIdToPath] doesn't exist when compacting batch [batchId] (compactInterval: [compactInterval])

compact is used while storing metadata for streaming batch.

spark.sql.streaming.fileSink.log.cleanupDelay

CompactibleFileStreamLog uses spark.sql.streaming.fileSink.log.cleanupDelay configuration property to delete expired log entries.

compact File Suffix

CompactibleFileStreamLog uses .compact file suffix for batchIdToPath, getBatchIdFromFileName, and the compactInterval.

Storing Metadata for Streaming Batch

add(
  batchId: Long,
  logs: Array[T]): Boolean

add checks whether the given batchId is compaction batch or not (alongside compact interval).

add...FIXME

add is part of the MetadataLog abstraction.

Deleting Expired Log Entries

deleteExpiredLog(
  currentBatchId: Long): Unit

deleteExpiredLog...FIXME

deleteExpiredLog does nothing and simply returns when the current batch ID incremented (currentBatchId + 1) is below the compact interval plus the minBatchesToRetain.

Compact Interval

compactInterval: Int

compactInterval is the number of metadata log files between compactions.

Lazy Value

compactInterval is a Scala lazy value which means that the code to initialize it is executed once only (when accessed for the first time) and cached afterwards.

compactInterval finds compacted IDs and determines the compact interval.

compactInterval requests the CheckpointFileManager for the files in the metadataPath that are batch (and possibly compacted). compactInterval takes the compacted files only (if available), converts them to batch IDs and sorts in descending order.

compactInterval starts with the default compact interval.

  • If there are two compacted IDs, their difference is the compact interval
  • If there is one compacted ID only, compactInterval "derives" the compact interval (FIXME)

compactInterval asserts that the compact interval is a positive value or throws an AssertionError.

compactInterval prints out the following INFO message to the logs (with the defaultCompactInterval):

Set the compact interval to [interval] [defaultCompactInterval: [defaultCompactInterval]]

All Files (Except Deleted)

allFiles(): Array[T]

allFiles...FIXME

allFiles is used when:

Converting Batch Id to Hadoop Path

batchIdToPath(
  batchId: Long): Path

batchIdToPath...FIXME

batchIdToPath is part of the HDFSMetadataLog abstraction.

Converting Hadoop Path to Batch Id

pathToBatchId(
  path: Path): Long

pathToBatchId...FIXME

pathToBatchId is part of the HDFSMetadataLog abstraction.

isBatchFile

isBatchFile(
  path: Path): Boolean

isBatchFile is true when successful to get the batchId for the given path. Otherwise is false.

isBatchFile is part of the HDFSMetadataLog abstraction.

Serializing Metadata (Writing Metadata in Serialized Format)

serialize(
  logData: Array[T],
  out: OutputStream): Unit

serialize writes the version header (v and the <>) out to the given output stream (in UTF_8).

serialize then writes the log data (serialized using Json4s (with Jackson binding) library). Entries are separated by new lines.

serialize is part of the HDFSMetadataLog abstraction.

Deserializing Metadata

deserialize(
  in: InputStream): Array[T]

deserialize...FIXME

deserialize is part of the HDFSMetadataLog abstraction.

Utilities

getBatchIdFromFileName

getBatchIdFromFileName(
  fileName: String): Long

getBatchIdFromFileName simply removes the .compact suffix from the given fileName and converts the remaining part to a number.

getBatchIdFromFileName is used for pathToBatchId, isBatchFile, and delete expired log entries.

getValidBatchesBeforeCompactionBatch

getValidBatchesBeforeCompactionBatch(
  compactionBatchId: Long,
  compactInterval: Int): Seq[Long]

getValidBatchesBeforeCompactionBatch...FIXME

getBatchIdFromFileName is used for compaction.