Snapshot¶
Snapshot is an immutable snapshot of the state of the Delta table at the version.
Tip
Use Demo: DeltaTable, DeltaLog And Snapshots to learn more.
Creating Instance¶
Snapshot takes the following to be created:
- Hadoop Path to the log directory
- Version
-
LogSegment
-
minFileRetentionTimestamp
(that is exactly DeltaLog.minFileRetentionTimestamp) - DeltaLog
- Timestamp
- Optional
VersionChecksum
While being created, Snapshot
prints out the following INFO message to the logs and initialize:
Created snapshot [this]
Snapshot
is created when SnapshotManagement
is requested for one.
Initializing¶
init(): Unit
init
requests the DeltaLog for the protocolRead for the Protocol.
Computed State¶
computedState: State
Scala lazy value
computedState
is a Scala lazy value and is initialized once at the first access. Once computed it stays unchanged for the Snapshot
instance.
lazy val computedState: State
computedState
takes the current cached set of actions and reads the latest state (executes a state.select(...).first()
query).
Note
The state.select(...).first()
query uses aggregate standard functions (e.g. last
, collect_set
, sum
, count
) and so uses groupBy
over the whole dataset indirectly.
computedState
assumes that the protocol and metadata (actions) are defined. computedState
throws an IllegalStateException
when the actions are not defined and spark.databricks.delta.stateReconstructionValidation.enabled configuration property is enabled.
The [action] of your Delta table couldn't be recovered while Reconstructing
version: [version]. Did you manually delete files in the _delta_log directory?
Note
The state.select(...).first()
query uses last
with ignoreNulls
flag true
and so may give no rows for first()
.
computedState
makes sure that the State
to be returned has at least the default protocol and metadata (actions) defined.
Configuration Properties¶
spark.databricks.delta.snapshotPartitions¶
Snapshot
uses the spark.databricks.delta.snapshotPartitions configuration property for the number of partitions to use for state reconstruction.
spark.databricks.delta.stateReconstructionValidation.enabled¶
Snapshot
uses the spark.databricks.delta.stateReconstructionValidation.enabled configuration property for reconstructing state.
State Dataset of Actions¶
state: Dataset[SingleAction]
state
simply requests the cached delta state to get the delta state from the cache.
state
is used when:
Checkpoints
utility is used to writeCheckpointSnapshot
is requested for computedState, all files and files removed (tombstones)VacuumCommand
utility is requested for garbage collection
All AddFiles¶
allFiles: Dataset[AddFile]
allFiles
simply takes the state dataset and selects AddFiles (adds where
clause for add IS NOT NULL
and select
over the fields of AddFiles).
Note
allFiles
simply adds where
and select
clauses. No computation happens yet as it is (a description of) a distributed computation as a Dataset[AddFile]
.
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, "/tmp/delta/users")
val files = deltaLog.snapshot.allFiles
scala> :type files
org.apache.spark.sql.Dataset[org.apache.spark.sql.delta.actions.AddFile]
scala> files.show(truncate = false)
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
|path |partitionValues|size|modificationTime|dataChange|stats|tags|
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
|part-00000-4050db39-e0f5-485d-ab3b-3ca72307f621-c000.snappy.parquet|[] |262 |1578083748000 |false |null |null|
|part-00000-ba39f292-2970-4528-a40c-8f0aa5f796de-c000.snappy.parquet|[] |262 |1578083570000 |false |null |null|
|part-00003-99f9d902-24a7-4f76-a15a-6971940bc245-c000.snappy.parquet|[] |429 |1578083748000 |false |null |null|
|part-00007-03d987f1-5bb3-4b5b-8db9-97b6667107e2-c000.snappy.parquet|[] |429 |1578083748000 |false |null |null|
|part-00011-a759a8c2-507d-46dd-9da7-dc722316214b-c000.snappy.parquet|[] |429 |1578083748000 |false |null |null|
|part-00015-2e685d29-25ed-4262-90a7-5491847fd8d0-c000.snappy.parquet|[] |429 |1578083748000 |false |null |null|
|part-00015-ee0ac1af-e1e0-4422-8245-12da91ced0a2-c000.snappy.parquet|[] |429 |1578083570000 |false |null |null|
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
allFiles
is used when:
-
PartitionFiltering
is requested for the files to scan (matching projection attributes and predicates) -
DeltaSourceSnapshot
is requested for the initial files (indexed AddFiles) -
GenerateSymlinkManifestImpl
is requested to generateIncrementalManifest and generateFullManifest -
DeltaDataSource
is requested for an Insertable HadoopFsRelation
stateReconstruction Dataset of Actions¶
stateReconstruction: Dataset[SingleAction]
Note
stateReconstruction
returns a Dataset[SingleAction]
and so does not do any computation per se.
stateReconstruction
is a Dataset
of SingleActions (that is the dataset part) of the cachedState.
stateReconstruction
loads the log file indices (that gives a Dataset[SingleAction]
).
stateReconstruction
maps over partitions (using Dataset.mapPartitions
) and canonicalize the paths for AddFile and RemoveFile actions.
stateReconstruction
adds file
column that uses a UDF to assert that input_file_name()
belongs to the Delta table.
Note
This UDF-based check is very clever.
stateReconstruction
repartitions the Dataset
using the path of add or remove actions (with the configurable number of partitions) and Dataset.sortWithinPartitions
by the file
column.
In the end, stateReconstruction
maps over partitions (using Dataset.mapPartitions
) that creates a InMemoryLogReplay, requests it to append the actions (as version 0
) and checkpoint.
stateReconstruction
is used when Snapshot
is requested for a cached state.
Loading Log File Indices¶
loadActions: Dataset[SingleAction]
loadActions
takes fileIndices and...FIXME
fileIndices¶
fileIndices: Seq[DeltaLogFileIndex]
Scala lazy value
fileIndices
is a Scala lazy value and is initialized once at the first access. Once computed it stays unchanged for the Snapshot
instance.
lazy val fileIndices: Seq[DeltaLogFileIndex]
fileIndices
is a collection of the checkpointFileIndexOpt and the deltaFileIndexOpt (if they are available).
Commit File Index¶
deltaFileIndexOpt: Option[DeltaLogFileIndex]
Scala lazy value
deltaFileIndexOpt
is a Scala lazy value and is initialized once when first accessed. Once computed, it stays unchanged for the Snapshot
instance.
lazy val deltaFileIndexOpt: Option[DeltaLogFileIndex]
deltaFileIndexOpt
is a DeltaLogFileIndex (in JsonFileFormat
) for the checkpoint file of the LogSegment.
Checkpoint File Index¶
checkpointFileIndexOpt: Option[DeltaLogFileIndex]
Scala lazy value
checkpointFileIndexOpt
is a Scala lazy value and is initialized once when first accessed. Once computed, it stays unchanged for the Snapshot
instance.
lazy val checkpointFileIndexOpt: Option[DeltaLogFileIndex]
checkpointFileIndexOpt
is a DeltaLogFileIndex (in ParquetFileFormat
) for the delta files of the LogSegment.
emptyActions Dataset (of Actions)¶
emptyActions: Dataset[SingleAction]
emptyActions
is an empty dataset of SingleActions for stateReconstruction and load.
Transaction Version By App ID¶
transactions: Map[String, Long]
transactions
takes the SetTransaction actions (from the state dataset) and makes them a lookup table of transaction version by appId.
Scala lazy value
transactions
is a Scala lazy value and is initialized once at the first access. Once computed it stays unchanged for the Snapshot
instance.
lazy val transactions: Map[String, Long]
transactions
is used when OptimisticTransactionImpl
is requested for the transaction version for a given (streaming query) id.
All RemoveFile Actions (Tombstones)¶
tombstones: Dataset[RemoveFile]
tombstones
...FIXME
scala> deltaLog.snapshot.tombstones.show(false)
+----+-----------------+----------+
|path|deletionTimestamp|dataChange|
+----+-----------------+----------+
+----+-----------------+----------+
cachedState¶
cachedState: CachedDS[SingleAction]
Scala lazy value
cachedState
is a Scala lazy value and is initialized once at the first access. Once computed it stays unchanged for the Snapshot
instance.
lazy val cachedState: CachedDS[SingleAction]
cachedState
creates a Cached Delta State with the following:
-
The dataset part is the stateReconstruction dataset of SingleActions
-
The name in the format Delta Table State #version - [redactedPath] (with the version and the path redacted)
Used when Snapshot is requested for the state (Dataset[SingleAction]
)