Skip to content

VacuumCommand Utility

VacuumCommand is a concrete VacuumCommandImpl for garbage collection of a delta table for the following:

Garbage Collection of Delta Table

gc(
  spark: SparkSession,
  deltaLog: DeltaLog,
  dryRun: Boolean = true,
  retentionHours: Option[Double] = None,
  clock: Clock = new SystemClock): DataFrame

gc requests the given DeltaLog to update (and hence give the latest Snapshot of the delta table).

retentionMillis

gc converts the retention hours to milliseconds and checkRetentionPeriodSafety (with deletedFileRetentionDuration table configuration).

Timestamp to Delete Files Before

gc determines the timestamp to delete files before based on the retentionMillis (if defined) or defaults to minFileRetentionTimestamp table configuration.

gc prints out the following INFO message to the logs (with the path of the given DeltaLog):

Starting garbage collection (dryRun = [dryRun]) of untracked files older than [deleteBeforeTimestamp] in [path]

Valid Files

gc requests the Snapshot for the state dataset and maps over partitions (Dataset.mapPartitions) with a map function that does the following (for every Action in a partition of SingleActions):

  1. Skips RemoveFiles with the deletion timestamp after the timestamp to delete files before
  2. Adds the path of FileActions (that live inside the directory of the table) with all subdirectories
  3. Skips other actions

gc converts the mapped state dataset into a DataFrame with a single path column.

Note

There is no DataFrame action executed so no processing yet (using Spark).

All Files and Directories Dataset

gc finds all the files and directories (recursively) in the data path (with spark.sql.sources.parallelPartitionDiscovery.parallelism number of file listing tasks).

Caching All Files and Directories Dataset

gc caches the allFilesAndDirs dataset.

Number of Directories

gc counts the number of directories (as the count of the rows with isDir column being true in the allFilesAndDirs dataset).

Note

This step submits a Spark job for Dataset.count.

Paths Dataset

gc creates a Spark SQL query to count paths of the allFilesAndDirs with files with the modificationTime ealier than the deleteBeforeTimestamp and the directories (isDirs). That creates a DataFrame of path and count columns.

gc uses left-anti join of the counted path DataFrame with the validFiles on path.

gc filters out paths with count more than 1 and selects path.

Dry Run

gc counts the rows in the paths Dataset for the number of files and directories that are safe to delete (numFiles).

Note

This step submits a Spark job for Dataset.count.

gc prints out the following message to the console (with the dirCounts):

Found [numFiles] files and directories in a total of [dirCounts] directories that are safe to delete.

In the end, gc converts the paths to Hadoop DFS format and creates a DataFrame with a single path column.

Deleting Files and Directories

gc prints out the following INFO message to the logs:

Deleting untracked files and empty directories in [path]

gc deletes the untracked files and empty directories (with parallel delete enabled flag based on spark.databricks.delta.vacuum.parallelDelete.enabled configuration property).

gc prints out the following message to standard output (with the dirCounts):

Deleted [filesDeleted] files and directories in a total of [dirCounts] directories.

In the end, gc creates a DataFrame with a single path column with just the data path of the delta table to vacuum.

Unpersist All Files and Directories Dataset

gc unpersists the allFilesAndDirs dataset.

checkRetentionPeriodSafety

checkRetentionPeriodSafety(
  spark: SparkSession,
  retentionMs: Option[Long],
  configuredRetention: Long): Unit

checkRetentionPeriodSafety...FIXME

getValidFilesFromSnapshot

getValidFilesFromSnapshot(
  spark: SparkSession,
  basePath: String,
  snapshot: Snapshot,
  retentionMillis: Option[Long],
  hadoopConf: Broadcast[SerializableConfiguration],
  clock: Clock,
  checkAbsolutePathOnly: Boolean): DataFrame

getValidFilesFromSnapshot...FIXME

Logging

Enable ALL logging level for org.apache.spark.sql.delta.commands.VacuumCommand logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.commands.VacuumCommand=ALL

Refer to Logging.