Skip to content

Demo: Vacuum

This demo shows VACUUM command in action.

Start Spark Shell

./bin/spark-shell \
  --packages io.delta:delta-core_2.12:3.2.1 \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

Create Delta Table

val path = "/tmp/delta/t1"

Make sure that there is no delta table at the location. Remove it if exists and start over.

import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, path)
assert(deltaLog.tableExists == false)

Create a demo delta table (using Scala API). Write some data to the delta table, effectively creating the first version.

spark.range(4)
  .withColumn("p", 'id % 2)
  .write
  .format("delta")
  .partitionBy("p")
  .save(path)

Display the available versions of the delta table.

import io.delta.tables.DeltaTable
val dt = DeltaTable.forPath(path)
val history = dt.history.select('version, 'operation, 'operationMetrics)
history.show(truncate = false)
+-------+---------+-----------------------------------------------------------+
|version|operation|operationMetrics                                           |
+-------+---------+-----------------------------------------------------------+
|0      |WRITE    |{numFiles -> 4, numOutputRows -> 4, numOutputBytes -> 1912}|
+-------+---------+-----------------------------------------------------------+

Alternatively, you could use DESCRIBE HISTORY SQL command.

sql(s"DESC HISTORY delta.`$path`")
  .select('version, 'operation, 'operationMetrics)
  .show(truncate = false)
+-------+---------+-----------------------------------------------------------+
|version|operation|operationMetrics                                           |
+-------+---------+-----------------------------------------------------------+
|0      |WRITE    |{numFiles -> 4, numOutputRows -> 4, numOutputBytes -> 1912}|
+-------+---------+-----------------------------------------------------------+

Delete All

Delete all data in the delta table, effectively creating the second version.

sql(s"""
DELETE FROM delta.`$path`
""").show(truncate = false)
import io.delta.tables.DeltaTable
DeltaTable.forPath(path).delete
+-----------------+
|num_affected_rows|
+-----------------+
|4                |
+-----------------+

Display the available versions of the delta table.

sql(s"""
DESC HISTORY delta.`$path`
""")
  .select('version, 'operation, 'operationMetrics)
  .show(truncate = false)
val history = dt.history.select('version, 'operation, 'operationMetrics)
history.show(truncate = false)
+-------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation|operationMetrics                                                                                                                                                                                                           |
+-------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1      |DELETE   |{numRemovedFiles -> 4, numRemovedBytes -> 1912, numCopiedRows -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 1866, numDeletedRows -> 4, scanTimeMs -> 1860, numAddedFiles -> 0, numAddedBytes -> 0, rewriteTimeMs -> 0}|
|0      |WRITE    |{numFiles -> 4, numOutputRows -> 4, numOutputBytes -> 1912}                                                                                                                                                                |
+-------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Vacuum DRY RUN (IllegalArgumentException)

Let's vacuum the delta table (in DRY RUN mode).

sql(s"""
VACUUM delta.`$path` RETAIN 0 HOURS DRY RUN
""")
java.lang.IllegalArgumentException: requirement failed: Are you sure you would like to vacuum files with such a low retention period? If you have
writers that are currently writing to this table, there is a risk that you may corrupt the
state of your Delta table.

If you are certain that there are no operations being performed on this table, such as
insert/upsert/delete/optimize, then you may turn off this check by setting:
spark.databricks.delta.retentionDurationCheck.enabled = false

If you are not sure, please use a value not less than "168 hours".

Attempting to vacuum the delta table (even with DRY RUN) gives an IllegalArgumentException because of the default values of the following:

Vacuum DRY RUN

retentionDurationCheck.enabled Configuration Property

Turn the spark.databricks.delta.retentionDurationCheck.enabled configuration property off and give the VACUUM command a go again.

sql("""
SET spark.databricks.delta.retentionDurationCheck.enabled=false
""")
  .show(truncate = false)
+-----------------------------------------------------+-----+
|key                                                  |value|
+-----------------------------------------------------+-----+
|spark.databricks.delta.retentionDurationCheck.enabled|false|
+-----------------------------------------------------+-----+
val q = sql(s"VACUUM delta.`$path` RETAIN 0 HOURS DRY RUN")

You should see the following message in the console:

Found 4 files and directories in a total of 3 directories that are safe to delete.

The result DataFrame is the paths that are safe to delete which are all of the data files in the delta table.

q.show(truncate = false)
+------------------------------------------------------------------------------------------+
|path                                                                                      |
+------------------------------------------------------------------------------------------+
|file:/tmp/delta/t1/p=0/part-00011-40983cc5-18bf-4d91-8b7b-eb805b8c862d.c000.snappy.parquet|
|file:/tmp/delta/t1/p=1/part-00015-9576ff56-28f7-410e-8ea3-e43352a5083c.c000.snappy.parquet|
|file:/tmp/delta/t1/p=0/part-00003-8e300857-ad6a-4889-b944-d31624a8024f.c000.snappy.parquet|
|file:/tmp/delta/t1/p=1/part-00007-4fba265a-0884-44c3-84ed-4e9aee524e3a.c000.snappy.parquet|
+------------------------------------------------------------------------------------------+

deletedFileRetentionDuration Table Property

Let's DESCRIBE DETAIL to review the current table properties (incl. deletedFileRetentionDuration).

val tid = s"delta.`$path`"
sql(s"""
DESCRIBE DETAIL $tid
""")
  .select('format, 'location, 'properties)
  .show(truncate = false)
+------+------------------+----------+
|format|location          |properties|
+------+------------------+----------+
|delta |file:/tmp/delta/t1|{}        |
+------+------------------+----------+

Prefix the deletedFileRetentionDuration table property with delta. for ALTER TABLE to accept it as a Delta property.

sql(s"""
ALTER TABLE $tid
SET TBLPROPERTIES (
  delta.deletedFileRetentionDuration = '0 hours'
)
""")

Use DESCRIBE DETAIL SQL command.

sql(s"""
DESCRIBE DETAIL $tid
""")
  .select('format, 'location, 'properties)
  .show(truncate = false)
+------+------------------+-----------------------------------------------+
|format|location          |properties                                     |
+------+------------------+-----------------------------------------------+
|delta |file:/tmp/delta/t1|{delta.deletedFileRetentionDuration -> 0 hours}|
+------+------------------+-----------------------------------------------+

Display the available versions of the delta table and note that ALTER TABLE gave a new version. This time you include operationParameters column (not operationMetrics as less important).

dt.history
  .select('version, 'operation, 'operationParameters)
  .history.show(truncate = false)
+-------+-----------------+----------------------------------------------------------------+
|version|operation        |operationParameters                                             |
+-------+-----------------+----------------------------------------------------------------+
|2      |SET TBLPROPERTIES|{properties -> {"delta.deletedFileRetentionDuration":"0 hours"}}|
|1      |DELETE           |{predicate -> ["true"]}                                         |
|0      |WRITE            |{mode -> ErrorIfExists, partitionBy -> ["p"]}                   |
+-------+-----------------+----------------------------------------------------------------+

You can access the table properties (table configuration) using DeltaLog Scala API.

import org.apache.spark.sql.delta.DeltaLog
val log = DeltaLog.forTable(spark, path)
log.snapshot.metadata.configuration

Revert the latest change to spark.databricks.delta.retentionDurationCheck.enabled and turn it on back.

spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", true)

Tree Delta Table Directory

In a terminal (outside spark-shell) run tree or a similar command to review what the directory of the delta table looks like.

tree /tmp/delta/t1
/tmp/delta/t1
├── _delta_log
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.json
│   └── 00000000000000000002.json
├── p=0
│   ├── part-00002-81e59926-8644-4bd4-8984-5a9e889911e1.c000.snappy.parquet
│   └── part-00008-27bfe3eb-1812-4861-908a-3a727f2ad9cb.c000.snappy.parquet
└── p=1
    ├── part-00005-ef9a916a-12b9-4f91-a004-c45e1f21ed81.c000.snappy.parquet
    └── part-00011-c7403108-0438-401b-b13f-41d70d786024.c000.snappy.parquet

4 directories, 7 files

Vacuum Retain 0 Hours

Let's clean up (vacuum) the delta table entirely, effectively deleting all the data files physically from disk.

Back in spark-shell, run VACUUM SQL command again, but this time with no DRY RUN.

sql(s"""
VACUUM delta.`$path`
RETAIN 0 HOURS
""")
  .show(truncate = false)
Deleted 4 files and directories in a total of 3 directories.
+------------------+
|path              |
+------------------+
|file:/tmp/delta/t1|
+------------------+

In a terminal (outside spark-shell), run tree or a similar command to review what the directory of the delta table looks like.

tree /tmp/delta/t1
/tmp/delta/t1
├── _delta_log
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.json
│   ├── 00000000000000000002.json
│   ├── 00000000000000000003.json
│   └── 00000000000000000004.json
├── p=0
└── p=1

4 directories, 5 files

Switch to spark-shell and display the available versions of the delta table. There should really be no change compared to the last time you executed it.

dt.history
  .select('version, 'operation, 'operationParameters)
  .show(truncate = false)

As of Delta Lake 2.3.0, VACUUM operations are recorded in the transaction log.

+-------+-----------------+-------------------------------------------------------------------------------------------+
|version|operation        |operationParameters                                                                        |
+-------+-----------------+-------------------------------------------------------------------------------------------+
|4      |VACUUM END       |{status -> COMPLETED}                                                                      |
|3      |VACUUM START     |{retentionCheckEnabled -> true, defaultRetentionMillis -> 0, specifiedRetentionMillis -> 0}|
|2      |SET TBLPROPERTIES|{properties -> {"delta.deletedFileRetentionDuration":"0 hours"}}                           |
|1      |DELETE           |{predicate -> ["true"]}                                                                    |
|0      |WRITE            |{mode -> ErrorIfExists, partitionBy -> ["p"]}                                              |
+-------+-----------------+-------------------------------------------------------------------------------------------+