Skip to content

RestoreTableCommand

RestoreTableCommand is a DeltaCommand to restore a delta table to a specified version or timestamp.

RestoreTableCommand is transactional (and starts a new transaction when executed).

Creating Instance

RestoreTableCommand takes the following to be created:

RestoreTableCommand is created when:

RestoreTableCommandBase

RestoreTableCommand is a RestoreTableCommandBase.

LeafRunnableCommand

RestoreTableCommand is a LeafRunnableCommand (Spark SQL).

Output Attributes

output: Seq[Attribute]

output is the outputSchema.

output is part of the Command (Spark SQL) abstraction.

Executing Command

run(
  spark: SparkSession): Seq[Row]

run requests the DeltaTableV2 for the DeltaLog.

run requests the DeltaTableV2 for the DeltaTimeTravelSpec to access the version.

Note

RestoreTableCommand does not even bother itself to check if the optional DeltaTimeTravelSpec (of the DeltaTableV2) is defined or not. It is assumed that it is available and so is the version.

It is unlike the timestamp. Why?!

run retrieves the timestamp (if defined).

run determines the version to restore based on the version or the timestamp (whatever defined). For the timestamp, run resolves the version by requesting the DeltaLog for the DeltaHistoryManager that in turn is requested for the active commit at that timestamp (with the canReturnLastCommit flag enabled).

Either the version or timestamp should be provided for restore

run asserts that either the version or timestamp are provided or throws an IllegalArgumentException:

Either the version or timestamp should be provided for restore

run requests the DeltaLog to update to find the latest version.

Version to restore should be less then last available version

run requires that the latest version of the delta table is higher than the requested version to restore or throws an IllegalArgumentException:

Version to restore ([versionToRestore])should be less then last available version ([latestVersion])

NB: You're right that there are a few typos in the exception message.

run requests the DeltaLog to start a new transaction and does the heavy lifting.

run requests the OptimisticTransaction for the latest (current) Snapshot.

run requests the DeltaLog for the Snapshot at the version to restore (snapshot to restore to) and reconciles them using all the files (in the snapshots).

Dataset[AddFile]

All the files are Dataset[AddFile]s.

run determines filesToAdd (as Dataset[AddFile]). run uses left_anti join on the Dataset[AddFile] of the snapshot to restore to with the current snapshot on path column. run marks the AddFiles (in the joined Dataset[AddFile]) as dataChange.

No Spark job started yet

No Spark job is started yet as run is only preparing it.

run determines filesToRemove (as Dataset[RemoveFile]). run uses left_anti join on the Dataset[AddFile] of the current snapshot with the snapshot to restore to on path column. run marks the AddFiles (in the joined Dataset[AddFile]) to be removed with the current timestamp.

No Spark job started yet

No Spark job is started yet as run is only preparing it.

run checkSnapshotFilesAvailability when spark.sql.files.ignoreMissingFiles (Spark SQL) configuration property is enabled.

spark.sql.files.ignoreMissingFiles

spark.sql.files.ignoreMissingFiles is disabled by default.

run computeMetrics (as a metrics Spark job).

2 Spark Jobs Submitted

computeMetrics submits 2 aggregations that give 2 Spark jobs submitted.

run creates a local (Scala) iterator over the filesToAdd dataset (as a add actions Spark job).

Local Scala Iterator is Multiple Spark Jobs

run uses Dataset.toLocalIterator (Spark SQL) action that triggers multiple jobs.

run creates a local (Scala) iterator over the filesToRemove dataset (as a remove actions Spark job).

run requests the OptimisticTransaction to update the metadata to the Metadata of the snapshot to restore to.

run commitLarge the current transaction with the following:

In the end, run returns the metrics.


run is part of the RunnableCommand (Spark SQL) abstraction.

getTimestamp

getTimestamp(): Option[String]

getTimestamp...FIXME

Logging

Enable ALL logging level for org.apache.spark.sql.delta.commands.RestoreTableCommand logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.commands.RestoreTableCommand=ALL

Refer to Logging.