Skip to content

Snapshot

Snapshot is an immutable snapshot of the state of the Delta table at the version.

Creating Instance

Snapshot takes the following to be created:

While being created, Snapshot prints out the following INFO message to the logs and initialize:

Created snapshot [this]

Snapshot is created when SnapshotManagement is requested for one.

Initializing

init(): Unit

init requests the DeltaLog for the protocolRead for the Protocol.

Computed State

computedState: State

Scala lazy value

computedState is a Scala lazy value and is initialized once at the first access. Once computed it stays unchanged for the Snapshot instance.

lazy val computedState: State

computedState takes the current cached set of actions and reads the latest state (executes a state.select(...).first() query).

Note

The state.select(...).first() query uses aggregate standard functions (e.g. last, collect_set, sum, count) and so uses groupBy over the whole dataset indirectly.

Details for Query in web UI

computedState assumes that the protocol and metadata (actions) are defined. computedState throws an IllegalStateException when the actions are not defined and spark.databricks.delta.stateReconstructionValidation.enabled configuration property is enabled.

The [action] of your Delta table couldn't be recovered while Reconstructing
version: [version]. Did you manually delete files in the _delta_log directory?

Note

The state.select(...).first() query uses last with ignoreNulls flag true and so may give no rows for first().

computedState makes sure that the State to be returned has at least the default protocol and metadata (actions) defined.

Configuration Properties

spark.databricks.delta.snapshotPartitions

Snapshot uses the spark.databricks.delta.snapshotPartitions configuration property for the number of partitions to use for state reconstruction.

spark.databricks.delta.stateReconstructionValidation.enabled

Snapshot uses the spark.databricks.delta.stateReconstructionValidation.enabled configuration property for reconstructing state.

State Dataset of Actions

state: Dataset[SingleAction]

state simply requests the cached delta state to get the delta state from the cache.

state is used when:

All AddFile Actions

allFiles: Dataset[AddFile]

allFiles simply takes the state dataset and selects AddFiles (adds where clause for add IS NOT NULL and select over the fields of AddFiles).

Note

allFiles simply adds where and select clauses. No computation happens yet as it is (a description of) a distributed computation as a Dataset[AddFile].

import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, "/tmp/delta/users")
val files = deltaLog.snapshot.allFiles

scala> :type files
org.apache.spark.sql.Dataset[org.apache.spark.sql.delta.actions.AddFile]

scala> files.show(truncate = false)
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
|path                                                               |partitionValues|size|modificationTime|dataChange|stats|tags|
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
|part-00000-4050db39-e0f5-485d-ab3b-3ca72307f621-c000.snappy.parquet|[]             |262 |1578083748000   |false     |null |null|
|part-00000-ba39f292-2970-4528-a40c-8f0aa5f796de-c000.snappy.parquet|[]             |262 |1578083570000   |false     |null |null|
|part-00003-99f9d902-24a7-4f76-a15a-6971940bc245-c000.snappy.parquet|[]             |429 |1578083748000   |false     |null |null|
|part-00007-03d987f1-5bb3-4b5b-8db9-97b6667107e2-c000.snappy.parquet|[]             |429 |1578083748000   |false     |null |null|
|part-00011-a759a8c2-507d-46dd-9da7-dc722316214b-c000.snappy.parquet|[]             |429 |1578083748000   |false     |null |null|
|part-00015-2e685d29-25ed-4262-90a7-5491847fd8d0-c000.snappy.parquet|[]             |429 |1578083748000   |false     |null |null|
|part-00015-ee0ac1af-e1e0-4422-8245-12da91ced0a2-c000.snappy.parquet|[]             |429 |1578083570000   |false     |null |null|
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+

allFiles is used when:

stateReconstruction Dataset of Actions

stateReconstruction: Dataset[SingleAction]

Note

stateReconstruction returns a Dataset[SingleAction] and so does not do any computation per se.

stateReconstruction is a Dataset of SingleActions (that is the dataset part) of the cachedState.

stateReconstruction loads the log file indices (that gives a Dataset[SingleAction]).

stateReconstruction maps over partitions (using Dataset.mapPartitions) and canonicalize the paths for AddFile and RemoveFile actions.

stateReconstruction adds file column that uses a UDF to assert that input_file_name() belongs to the Delta table.

Note

This UDF-based check is very clever.

stateReconstruction repartitions the Dataset using the path of add or remove actions (with the configurable number of partitions) and Dataset.sortWithinPartitions by the file column.

In the end, stateReconstruction maps over partitions (using Dataset.mapPartitions) that creates a InMemoryLogReplay, requests it to append the actions (as version 0) and checkpoint.

stateReconstruction is used when Snapshot is requested for a cached state.

Loading Log File Indices

loadActions: Dataset[SingleAction]

loadActions takes fileIndices and...FIXME

fileIndices

fileIndices: Seq[DeltaLogFileIndex]

Scala lazy value

fileIndices is a Scala lazy value and is initialized once at the first access. Once computed it stays unchanged for the Snapshot instance.

lazy val fileIndices: Seq[DeltaLogFileIndex]

fileIndices is a collection of the checkpointFileIndexOpt and the deltaFileIndexOpt (if they are available).

Commit File Index

deltaFileIndexOpt: Option[DeltaLogFileIndex]

Scala lazy value

deltaFileIndexOpt is a Scala lazy value and is initialized once when first accessed. Once computed, it stays unchanged for the Snapshot instance.

lazy val deltaFileIndexOpt: Option[DeltaLogFileIndex]

deltaFileIndexOpt is a DeltaLogFileIndex (in JsonFileFormat) for the checkpoint file of the LogSegment.

Checkpoint File Index

checkpointFileIndexOpt: Option[DeltaLogFileIndex]

Scala lazy value

checkpointFileIndexOpt is a Scala lazy value and is initialized once when first accessed. Once computed, it stays unchanged for the Snapshot instance.

lazy val checkpointFileIndexOpt: Option[DeltaLogFileIndex]

checkpointFileIndexOpt is a DeltaLogFileIndex (in ParquetFileFormat) for the delta files of the LogSegment.

emptyActions Dataset (of Actions)

emptyActions: Dataset[SingleAction]

emptyActions is an empty dataset of SingleActions for stateReconstruction and load.

Transaction Version By App ID

transactions: Map[String, Long]

transactions takes the SetTransaction actions (from the state dataset) and makes them a lookup table of transaction version by appId.

Scala lazy value

transactions is a Scala lazy value and is initialized once at the first access. Once computed it stays unchanged for the Snapshot instance.

lazy val transactions: Map[String, Long]

transactions is used when OptimisticTransactionImpl is requested for the transaction version for a given (streaming query) id.

All RemoveFile Actions (Tombstones)

tombstones: Dataset[RemoveFile]

tombstones...FIXME

scala> deltaLog.snapshot.tombstones.show(false)
+----+-----------------+----------+
|path|deletionTimestamp|dataChange|
+----+-----------------+----------+
+----+-----------------+----------+

cachedState

cachedState: CachedDS[SingleAction]

Scala lazy value

cachedState is a Scala lazy value and is initialized once at the first access. Once computed it stays unchanged for the Snapshot instance.

lazy val cachedState: CachedDS[SingleAction]

cachedState creates a Cached Delta State with the following:

Used when Snapshot is requested for the state (Dataset[SingleAction])


Last update: 2020-10-05