VacuumCommand Utility¶
VacuumCommand
is a concrete VacuumCommandImpl for garbage collection of a delta table for the following:
-
DeltaTable.vacuum operator (as
DeltaTableOperations
is requested to execute vacuum command) -
VACUUM SQL command (as VacuumTableCommand is executed)
Garbage Collection of Delta Table¶
gc(
spark: SparkSession,
deltaLog: DeltaLog,
dryRun: Boolean = true,
retentionHours: Option[Double] = None,
clock: Clock = new SystemClock): DataFrame
gc
requests the given DeltaLog
to update (and hence give the latest Snapshot of the delta table).
retentionMillis¶
gc
converts the retention hours to milliseconds and checkRetentionPeriodSafety (with deletedFileRetentionDuration table configuration).
Timestamp to Delete Files Before¶
gc
determines the timestamp to delete files before based on the retentionMillis (if defined) or defaults to minFileRetentionTimestamp table configuration.
gc
prints out the following INFO message to the logs (with the path of the given DeltaLog):
Starting garbage collection (dryRun = [dryRun]) of untracked files older than [deleteBeforeTimestamp] in [path]
Valid Files¶
gc
requests the Snapshot
for the state dataset and maps over partitions (Dataset.mapPartitions
) with a map function that does the following (for every Action in a partition of SingleActions):
- Skips RemoveFiles with the deletion timestamp after the timestamp to delete files before
- Adds the path of FileActions (that live inside the directory of the table) with all subdirectories
- Skips other actions
gc
converts the mapped state dataset into a DataFrame
with a single path
column.
Note
There is no DataFrame action executed so no processing yet (using Spark).
All Files and Directories Dataset¶
gc
finds all the files and directories (recursively) in the data path (with spark.sql.sources.parallelPartitionDiscovery.parallelism
number of file listing tasks).
Caching All Files and Directories Dataset¶
gc
caches the allFilesAndDirs dataset.
Number of Directories¶
gc
counts the number of directories (as the count of the rows with isDir
column being true
in the allFilesAndDirs dataset).
Note
This step submits a Spark job for Dataset.count
.
Paths Dataset¶
gc
creates a Spark SQL query to count path
s of the allFilesAndDirs with files with the modificationTime
ealier than the deleteBeforeTimestamp and the directories (isDir
s). That creates a DataFrame
of path
and count
columns.
gc
uses left-anti join of the counted path DataFrame
with the validFiles on path
.
gc
filters out paths with count
more than 1
and selects path
.
Dry Run¶
gc
counts the rows in the paths Dataset for the number of files and directories that are safe to delete (numFiles).
Note
This step submits a Spark job for Dataset.count
.
gc
prints out the following message to the console (with the dirCounts):
Found [numFiles] files and directories in a total of [dirCounts] directories that are safe to delete.
In the end, gc
converts the paths to Hadoop DFS format and creates a DataFrame
with a single path
column.
Deleting Files and Directories¶
gc
prints out the following INFO message to the logs:
Deleting untracked files and empty directories in [path]
gc
deletes the untracked files and empty directories (with parallel delete enabled flag based on spark.databricks.delta.vacuum.parallelDelete.enabled configuration property).
gc
prints out the following message to standard output (with the dirCounts):
Deleted [filesDeleted] files and directories in a total of [dirCounts] directories.
In the end, gc
creates a DataFrame
with a single path
column with just the data path of the delta table to vacuum.
Unpersist All Files and Directories Dataset¶
gc
unpersists the allFilesAndDirs dataset.
checkRetentionPeriodSafety¶
checkRetentionPeriodSafety(
spark: SparkSession,
retentionMs: Option[Long],
configuredRetention: Long): Unit
checkRetentionPeriodSafety
...FIXME
getValidFilesFromSnapshot¶
getValidFilesFromSnapshot(
spark: SparkSession,
basePath: String,
snapshot: Snapshot,
retentionMillis: Option[Long],
hadoopConf: Broadcast[SerializableConfiguration],
clock: Clock,
checkAbsolutePathOnly: Boolean): DataFrame
getValidFilesFromSnapshot
...FIXME
Logging¶
Enable ALL
logging level for org.apache.spark.sql.delta.commands.VacuumCommand
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.delta.commands.VacuumCommand=ALL
Refer to Logging.