Skip to content

Demo: Rolling Back Table Changes (Restore Command)

This demo shows RESTORE command in action (using the SQL variant).

Logging

Enable logging for RestoreTableCommand.

Start Spark Shell

Start Spark Shell with Delta Lake.

Create Delta Table

Let's create a delta table using a mixture of Scala and SQL.

val tableName = "demo01"
sql(s"DROP TABLE IF EXISTS $tableName")
CREATE DATASOURCE TABLE

Learn more in CREATE DATASOURCE TABLE.

spark.range(1).writeTo(tableName).using("delta").create
CREATE TABLE $tableName
USING delta
COMMENT 'Demo delta table'
AS VALUES 0 t(id)
spark.table(tableName).show
+---+
| id|
+---+
|  0|
+---+

This is the first 0th version of the delta table with just a single row.

sql(s"desc history $tableName")
  .select('version, 'timestamp, 'operation)
  .show(truncate = false)
SELECT version, timestamp, operation
FROM (DESC HISTORY $tableName)
+-------+-----------------------+----------------------+
|version|timestamp              |operation             |
+-------+-----------------------+----------------------+
|0      |2022-06-07 11:37:28.707|CREATE TABLE AS SELECT|
+-------+-----------------------+----------------------+

Create Multiple Table Versions

Let's create multiple versions of the delta table by inserting some new rows.

INSERT INTO

INSERT INTO

Learn more in INSERT INTO.

sql(s"INSERT INTO $tableName VALUES 1")

That gives us another version.

MERGE INTO

Let's use Delta Lake's own MERGE INTO command.

sql(s"MERGE INTO $tableName USING (VALUES 2 t(id)) ON demo01.id = t.id WHEN NOT MATCHED THEN INSERT *")

DESC HISTORY

sql(s"desc history $tableName")
  .select('version, 'timestamp, 'operation)
  .show(truncate = false)
SELECT version, timestamp, operation
FROM (DESC HISTORY $tableName)
+-------+-----------------------+----------------------+
|version|timestamp              |operation             |
+-------+-----------------------+----------------------+
|2      |2022-06-07 11:38:52.448|MERGE                 |
|1      |2022-06-07 11:38:42.148|WRITE                 |
|0      |2022-06-07 11:37:28.707|CREATE TABLE AS SELECT|
+-------+-----------------------+----------------------+

Roll Back with RESTORE

The most recent version is 2 with the following rows:

spark.table(tableName).show
+---+
| id|
+---+
|  2|
|  0|
|  1|
+---+

Let's revert some changes to the delta table using RESTORE TABLE command.

Let's restore the initial (0th) version and review the history of this delta table.

sql(s"RESTORE TABLE $tableName TO VERSION AS OF 0").show

You should see the following INFO messages in the logs:

RestoreTableCommand: DELTA: RestoreTableCommand: compute missing files validation  (table path file:/Users/jacek/dev/oss/spark/spark-warehouse/demo01)
RestoreTableCommand: DELTA: Done
RestoreTableCommand: DELTA: RestoreTableCommand: compute metrics  (table path file:/Users/jacek/dev/oss/spark/spark-warehouse/demo01)
RestoreTableCommand: DELTA: Done
RestoreTableCommand: DELTA: RestoreTableCommand: compute add actions  (table path file:/Users/jacek/dev/oss/spark/spark-warehouse/demo01)
RestoreTableCommand: DELTA: Done
RestoreTableCommand: DELTA: RestoreTableCommand: compute remove actions  (table path file:/Users/jacek/dev/oss/spark/spark-warehouse/demo01)
RestoreTableCommand: DELTA: Done
RestoreTableCommand: Committed delta #3 to file:/Users/jacek/dev/oss/spark/spark-warehouse/demo01/_delta_log. Wrote 4 actions.

RestoreTableCommand should also give you command statistics.

+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|                     774|                         2|                2|                 0|               956|                  0|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+

Let's query the rows.

spark.table(tableName).show
+---+
| id|
+---+
|  0|
+---+

That looks OK. That's the row of the 0th version. Use the following query to prove it.

spark.read.format("delta").option("versionAsOf", 0).table(tableName).show
import org.apache.spark.sql.catalyst.TableIdentifier
val tid = TableIdentifier(tableName)

import org.apache.spark.sql.delta.DeltaTableIdentifier
val did = DeltaTableIdentifier(table = Some(tid))

val log = did.getDeltaLog(spark)
val snapshotAt = log.getSnapshotAt(0)

val relation = log.createRelation(snapshotToUseOpt = Some(snapshotAt))
val df = spark.baseRelationToDataFrame(relation)
df.show
+---+
| id|
+---+
|  0|
+---+

Let's review the history.

sql(s"desc history $tableName").select('version, 'timestamp, 'operation).show(truncate = false)
+-------+-----------------------+----------------------+
|version|timestamp              |operation             |
+-------+-----------------------+----------------------+
|3      |2022-06-07 11:40:09.496|RESTORE               |
|2      |2022-06-07 11:38:52.448|MERGE                 |
|1      |2022-06-07 11:38:42.148|WRITE                 |
|0      |2022-06-07 11:37:28.707|CREATE TABLE AS SELECT|
+-------+-----------------------+----------------------+

web UI

Open the web UI to review all the Spark jobs submitted.