Skip to content

Demo: Rolling Back Table Changes (Restore Command)

This demo shows RESTORE command in action (using the SQL variant).

Logging

Enable logging for RestoreTableCommand.

Start Spark Shell

Start Spark Shell with Delta Lake.

Create Delta Table

Let's create a delta table using a mixture of Scala and SQL.

sql("""
DROP TABLE IF EXISTS delta_demo
""")
CREATE DATASOURCE TABLE

Learn more in CREATE DATASOURCE TABLE.

spark.range(1).writeTo("delta_demo").using("delta").create
CREATE TABLE delta_demo
USING delta
COMMENT 'Demo delta table'
AS VALUES 0 t(id)
spark.table("delta_demo").show
+---+
| id|
+---+
|  0|
+---+

This is the first 0th version of the delta table with just a single row.

sql("""
DESC HISTORY delta_demo;
""")
  .select('version, 'operation, 'operationParameters)
  .show(truncate = false)
+-------+----------------------+-----------------------------------------------------------------------------+
|version|operation             |operationParameters                                                          |
+-------+----------------------+-----------------------------------------------------------------------------+
|0      |CREATE TABLE AS SELECT|{isManaged -> true, description -> null, partitionBy -> [], properties -> {}}|
+-------+----------------------+-----------------------------------------------------------------------------+

Create Multiple Table Versions

Let's create multiple versions of the delta table by inserting some new rows.

INSERT INTO

INSERT INTO

Learn more in INSERT INTO.

sql("""
INSERT INTO delta_demo VALUES 1
""")

That gives us another version.

MERGE INTO

Let's use Delta Lake's own MERGE INTO command.

sql("""
MERGE INTO delta_demo target
USING (VALUES 2 source(id))
ON target.id = source.id
WHEN NOT MATCHED THEN INSERT *
""")

DESC HISTORY

sql("""
DESC HISTORY delta_demo;
""")
  .select('version, 'operation, 'operationParameters)
  .show(truncate = false)
+-------+----------------------+----------------------------------------------------------------------------------------------------------------------------------+
|version|operation             |operationParameters                                                                                                               |
+-------+----------------------+----------------------------------------------------------------------------------------------------------------------------------+
|2      |MERGE                 |{predicate -> (target.id = CAST(source.id AS BIGINT)), matchedPredicates -> [], notMatchedPredicates -> [{"actionType":"insert"}]}|
|1      |WRITE                 |{mode -> Append, partitionBy -> []}                                                                                               |
|0      |CREATE TABLE AS SELECT|{isManaged -> true, description -> null, partitionBy -> [], properties -> {}}                                                     |
+-------+----------------------+----------------------------------------------------------------------------------------------------------------------------------+

Roll Back with RESTORE

The most recent version is 2 with the following rows:

spark.table("delta_demo").orderBy('id).show
+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+

Let's revert some changes to the delta table using RESTORE TABLE command.

Let's restore the initial (0th) version and review the history of this delta table.

sql("""
RESTORE delta_demo TO VERSION AS OF 0
""").show

You should see the following INFO messages in the logs:

RestoreTableCommand: DELTA: RestoreTableCommand: compute missing files validation  (table path file:/Users/jacek/dev/apps/spark-3.2.2-bin-hadoop3.2/spark-warehouse/delta_demo)
RestoreTableCommand: DELTA: Done
RestoreTableCommand: DELTA: RestoreTableCommand: compute metrics  (table path file:/Users/jacek/dev/apps/spark-3.2.2-bin-hadoop3.2/spark-warehouse/delta_demo)
RestoreTableCommand: DELTA: Done
RestoreTableCommand: DELTA: RestoreTableCommand: compute add actions  (table path file:/Users/jacek/dev/apps/spark-3.2.2-bin-hadoop3.2/spark-warehouse/delta_demo)
RestoreTableCommand: DELTA: Done
RestoreTableCommand: DELTA: RestoreTableCommand: compute remove actions  (table path file:/Users/jacek/dev/apps/spark-3.2.2-bin-hadoop3.2/spark-warehouse/delta_demo)
RestoreTableCommand: DELTA: Done
RestoreTableCommand: Committed delta #3 to file:/Users/jacek/dev/apps/spark-3.2.2-bin-hadoop3.2/spark-warehouse/delta_demo/_delta_log. Wrote 4 actions.

RestoreTableCommand should also give you some statistics.

+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|                     774|                         2|                2|                 0|               956|                  0|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+

Let's query the rows.

spark.table("delta_demo").show
+---+
| id|
+---+
|  0|
+---+

That looks OK. That's the row of the 0th version. Use the following query to prove it.

spark.read.format("delta").option("versionAsOf", 0).table("delta_demo").show
import org.apache.spark.sql.catalyst.TableIdentifier
val tid = TableIdentifier("delta_demo")

import org.apache.spark.sql.delta.DeltaTableIdentifier
val did = DeltaTableIdentifier(table = Some(tid))

val log = did.getDeltaLog(spark)
val snapshotAt = log.getSnapshotAt(0)

val relation = log.createRelation(snapshotToUseOpt = Some(snapshotAt))
val df = spark.baseRelationToDataFrame(relation)
df.show
+---+
| id|
+---+
|  0|
+---+

Let's review the history.

sql("""
DESC HISTORY delta_demo;
""")
  .select('version, 'operation, 'operationParameters)
  .show(truncate = false)
+-------+----------------------+----------------------------------------------------------------------------------------------------------------------------------+
|version|operation             |operationParameters                                                                                                               |
+-------+----------------------+----------------------------------------------------------------------------------------------------------------------------------+
|3      |RESTORE               |{version -> 0, timestamp -> null}                                                                                                 |
|2      |MERGE                 |{predicate -> (target.id = CAST(source.id AS BIGINT)), matchedPredicates -> [], notMatchedPredicates -> [{"actionType":"insert"}]}|
|1      |WRITE                 |{mode -> Append, partitionBy -> []}                                                                                               |
|0      |CREATE TABLE AS SELECT|{isManaged -> true, description -> null, partitionBy -> [], properties -> {}}                                                     |
+-------+----------------------+----------------------------------------------------------------------------------------------------------------------------------+

web UI

Open the web UI to review all the Spark jobs submitted.

RESTORE and ALTER TABLE SET TBLPROPERTIES

What happens when we enable Change Data Feed on a delta table and restore the table to the version before the change?

Enable Change Data Feed

sql("""
ALTER TABLE delta_demo
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
""")
sql("""
SHOW TBLPROPERTIES delta_demo;
""").show(truncate = false)
+--------------------------+-------+
|key                       |value  |
+--------------------------+-------+
|Type                      |MANAGED|
|delta.enableChangeDataFeed|true   |
|delta.minReaderVersion    |1      |
|delta.minWriterVersion    |4      |
+--------------------------+-------+

History

Let's review the history.

sql("""
DESC HISTORY delta_demo;
""")
  .select('version, 'operation)
  .show(truncate = false)
+-------+----------------------+
|version|operation             |
+-------+----------------------+
|4      |SET TBLPROPERTIES     |
|3      |RESTORE               |
|2      |MERGE                 |
|1      |WRITE                 |
|0      |CREATE TABLE AS SELECT|
+-------+----------------------+

Start Streaming Query

spark
  .readStream
  .format("delta")
  .option("readChangeFeed", true)
  .table("delta_demo")
  .writeStream
  .format("console")
  .start

You should see the following output.

-------------------------------------------
Batch: 0
-------------------------------------------
+---+------------+---------------+--------------------+
| id|_change_type|_commit_version|   _commit_timestamp|
+---+------------+---------------+--------------------+
|  0|      insert|              4|2022-08-05 12:47:...|
+---+------------+---------------+--------------------+

FIXME Why is the output printed out?

CDF was just enabled so no write was CDF-aware. How is this led to the output?

Restore to pre-CDF Version

Let's restore to the previous version and see what happens with the streaming query.

sql("""
RESTORE delta_demo TO VERSION AS OF 3
""").show
INFO RestoreTableCommand: DELTA: RestoreTableCommand: compute missing files validation  (table path file:/Users/jacek/dev/apps/spark-3.2.2-bin-hadoop3.2/spark-warehouse/delta_demo)
INFO RestoreTableCommand: DELTA: Done
INFO RestoreTableCommand: DELTA: RestoreTableCommand: compute metrics  (table path file:/Users/jacek/dev/apps/spark-3.2.2-bin-hadoop3.2/spark-warehouse/delta_demo)
INFO RestoreTableCommand: DELTA: Done
INFO RestoreTableCommand: DELTA: RestoreTableCommand: compute add actions  (table path file:/Users/jacek/dev/apps/spark-3.2.2-bin-hadoop3.2/spark-warehouse/delta_demo)
INFO RestoreTableCommand: DELTA: Done
INFO RestoreTableCommand: DELTA: RestoreTableCommand: compute remove actions  (table path file:/Users/jacek/dev/apps/spark-3.2.2-bin-hadoop3.2/spark-warehouse/delta_demo)
INFO RestoreTableCommand: DELTA: Done
INFO RestoreTableCommand: Committed delta #5 to file:/Users/jacek/dev/apps/spark-3.2.2-bin-hadoop3.2/spark-warehouse/delta_demo/_delta_log. Wrote 2 actions.
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|                     774|                         2|                0|                 0|                 0|                  0|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+

What's interesting is that the streaming query has produced no data (as if nothing has really happened).

Let's review the history.

sql("""
DESC HISTORY delta_demo;
""")
  .select('version, 'operation)
  .show(truncate = false)
+-------+----------------------+
|version|operation             |
+-------+----------------------+
|5      |RESTORE               |
|4      |SET TBLPROPERTIES     |
|3      |RESTORE               |
|2      |MERGE                 |
|1      |WRITE                 |
|0      |CREATE TABLE AS SELECT|
+-------+----------------------+

Let's review the table properties (and delta.enableChangeDataFeed in particular).

sql("""
SHOW TBLPROPERTIES delta_demo;
""").show(truncate = false)
+----------------------+-------+
|key                   |value  |
+----------------------+-------+
|Type                  |MANAGED|
|delta.minReaderVersion|1      |
|delta.minWriterVersion|4      |
+----------------------+-------+