RestoreTableCommand¶
RestoreTableCommand is a DeltaCommand to restore a delta table to a specified version or timestamp.
RestoreTableCommand is transactional (and starts a new transaction when executed).
Creating Instance¶
RestoreTableCommand takes the following to be created:
- DeltaTableV2
-
TableIdentifierof the delta table
RestoreTableCommand is created when:
DeltaAnalysislogical resolution rule is executed (to resolve a RestoreTableStatement)
RestoreTableCommandBase¶
RestoreTableCommand is a RestoreTableCommandBase.
LeafRunnableCommand¶
RestoreTableCommand is a LeafRunnableCommand (Spark SQL).
Output Attributes¶
output: Seq[Attribute]
output is the outputSchema.
output is part of the Command (Spark SQL) abstraction.
Executing Command¶
run(
spark: SparkSession): Seq[Row]
run requests the DeltaTableV2 for the DeltaLog.
run requests the DeltaTableV2 for the DeltaTimeTravelSpec to access the version.
Note
RestoreTableCommand does not even bother itself to check if the optional DeltaTimeTravelSpec (of the DeltaTableV2) is defined or not. It is assumed that it is available and so is the version.
It is unlike the timestamp. Why?!
run retrieves the timestamp (if defined).
run determines the version to restore based on the version or the timestamp (whatever defined). For the timestamp, run resolves the version by requesting the DeltaLog for the DeltaHistoryManager that in turn is requested for the active commit at that timestamp (with the canReturnLastCommit flag enabled).
Either the version or timestamp should be provided for restore
run asserts that either the version or timestamp are provided or throws an IllegalArgumentException:
Either the version or timestamp should be provided for restore
run requests the DeltaLog to update to find the latest version.
Version to restore should be less then last available version
run requires that the latest version of the delta table is higher than the requested version to restore or throws an IllegalArgumentException:
Version to restore ([versionToRestore])should be less then last available version ([latestVersion])
NB: You're right that there are a few typos in the exception message.
run requests the DeltaLog to start a new transaction and does the heavy lifting.
run requests the OptimisticTransaction for the latest (current) Snapshot.
run requests the DeltaLog for the Snapshot at the version to restore (snapshot to restore to) and reconciles them using all the files (in the snapshots).
Dataset[AddFile]
All the files are Dataset[AddFile]s.
run determines filesToAdd (as Dataset[AddFile]). run uses left_anti join on the Dataset[AddFile] of the snapshot to restore to with the current snapshot on path column. run marks the AddFiles (in the joined Dataset[AddFile]) as dataChange.
No Spark job started yet
No Spark job is started yet as run is only preparing it.
run determines filesToRemove (as Dataset[RemoveFile]). run uses left_anti join on the Dataset[AddFile] of the current snapshot with the snapshot to restore to on path column. run marks the AddFiles (in the joined Dataset[AddFile]) to be removed with the current timestamp.
No Spark job started yet
No Spark job is started yet as run is only preparing it.
run checkSnapshotFilesAvailability when spark.sql.files.ignoreMissingFiles (Spark SQL) configuration property is enabled.
spark.sql.files.ignoreMissingFiles
spark.sql.files.ignoreMissingFiles is disabled by default.
run computeMetrics (as a metrics Spark job).
2 Spark Jobs Submitted
computeMetrics submits 2 aggregations that give 2 Spark jobs submitted.
run creates a local (Scala) iterator over the filesToAdd dataset (as a add actions Spark job).
Local Scala Iterator is Multiple Spark Jobs
run uses Dataset.toLocalIterator (Spark SQL) action that triggers multiple jobs.
run creates a local (Scala) iterator over the filesToRemove dataset (as a remove actions Spark job).
run requests the OptimisticTransaction to update the metadata to the Metadata of the snapshot to restore to.
run commitLarge the current transaction with the following:
- AddFiles and RemoveFiles
DeltaOperations.Restore
In the end, run returns the metrics.
run is part of the RunnableCommand (Spark SQL) abstraction.
getTimestamp¶
getTimestamp(): Option[String]
getTimestamp...FIXME
Logging¶
Enable ALL logging level for org.apache.spark.sql.delta.commands.RestoreTableCommand logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.delta.commands.RestoreTableCommand=ALL
Refer to Logging.