Skip to content

Checkpoints

Checkpoints is an abstraction of DeltaLogs that can checkpoint the current state of a delta table.

Checkpoints requires to be used with DeltaLog (or subtypes) only.

Contract

dataPath

dataPath: Path

Hadoop Path to the data directory of the delta table

Cleaning Up Expired Logs

doLogCleanup(
  snapshotToCleanup: Snapshot): Unit
Procedure

doLogCleanup is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

Performs log cleanup

See:

Used when:

logPath

logPath: Path

Hadoop Path to the log directory of the delta table

Metadata

metadata: Metadata

Metadata of the delta table

snapshot

snapshot: Snapshot

Snapshot of the delta table

store

store: LogStore

LogStore

Implementations

_last_checkpoint Metadata File

Checkpoints uses _last_checkpoint metadata file (under the log path) for the following:

Checkpointing

checkpoint(): Unit
checkpoint(
  snapshotToCheckpoint: Snapshot): CheckpointMetaData

checkpoint writes a checkpoint of the current state of the delta table (Snapshot). That produces a checkpoint metadata with the version, the number of actions and possibly parts (for multi-part checkpoints).

checkpoint requests the LogStore to overwrite the _last_checkpoint file with the JSON-encoded checkpoint metadata.

In the end, checkpoint cleans up the expired logs (if enabled).


checkpoint is used when:

checkpointAndCleanUpDeltaLog

checkpointAndCleanUpDeltaLog(
  snapshotToCheckpoint: Snapshot): Unit
Procedure

checkpointAndCleanUpDeltaLog is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

checkpointAndCleanUpDeltaLog does the following (in the order):

  1. writeCheckpointFiles
  2. writeLastCheckpointFile
  3. doLogCleanup

writeCheckpointFiles

writeCheckpointFiles(
  snapshotToCheckpoint: Snapshot): CheckpointMetaData

writeCheckpointFiles writes out a checkpoint of the given Snapshot.

Writing Out State Checkpoint

writeCheckpoint(
  spark: SparkSession,
  deltaLog: DeltaLog,
  snapshot: Snapshot): CheckpointMetaData

writeCheckpoint writes out the contents of the given Snapshot into one or more checkpoint files (based on spark.databricks.delta.checkpoint.partSize configuration property).


writeCheckpoint creates the following accumulators:

  • checkpointRowCount
  • numOfFiles

writeCheckpoint reads spark.databricks.delta.checkpoint.partSize configuration property to calculate the number and paths of this checkpoint (based on numOfFiles and numOfRemoves of the given Snapshot).

For multiple checkpoint parts (paths), writeCheckpoint checkpointFileWithParts. Otherwise, writeCheckpoint checkpointFileSingular.

writeCheckpoint executes checkpointing (as a distributed computation using stateDS and as many tasks the number of checkpoint parts). writeCheckpoint uses a new execution ID with the name Delta checkpoint.

writeLastCheckpointFile

writeLastCheckpointFile(
  deltaLog: DeltaLog,
  checkpointMetaData: CheckpointMetaData,
  addChecksum: Boolean): Unit

writeLastCheckpointFile...FIXME

Loading Latest Checkpoint Metadata

lastCheckpoint: Option[CheckpointMetaData]

lastCheckpoint loadMetadataFromFile (allowing for 3 retries).

lastCheckpoint is used when:

loadMetadataFromFile

loadMetadataFromFile(
  tries: Int): Option[CheckpointMetaData]

loadMetadataFromFile loads the JSON-encoded _last_checkpoint file and converts it to CheckpointMetaData (with a version, size and parts).

loadMetadataFromFile uses the LogStore to read the []_last_checkpoint file.

In case the _last_checkpoint file is corrupted, loadMetadataFromFile...FIXME