Skip to content

Snapshot

Snapshot is an immutable snapshot of the state of a delta table (in the deltaLog) at the given version.

Snapshot uses aggregation expressions while computing state (as State).

Snapshot loads the actions (per the DeltaLogFileIndices) and builds a DataFrame.

Creating Instance

Snapshot takes the following to be created:

While being created, Snapshot prints out the following INFO message to the logs and triggers initialization.

Created snapshot [this]

Snapshot is created when:

  • SnapshotManagement is requested for a Snapshot

Initializing

init(): Unit

init validates the protocol and the metadata:

  1. Requests the DeltaLog to assert that the Protocol is protocolRead
  2. Requests the DeltaLog to assertTableFeaturesMatchMetadata (with the Protocol and the Metadata)
  3. Records undefined types (in the Delta protocol) in the schema (of the Metadata)

Maximum Number of Indexed Columns

numIndexedCols: Int

numIndexedCols is the value of dataSkippingNumIndexedCols table property.

Lazy Value

numIndexedCols is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

numIndexedCols is part of the StatisticsCollection abstraction.

Demo

Computed State

computedState: State
Lazy Value

computedState is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

computedState takes the current cached set of actions and reads the latest state (executes a state.select(...).first() query) with the aggregations (that are then mapped to a State).

Note

The state.select(...).first() query uses aggregate standard functions (e.g. last, collect_set, sum, count) and so uses groupBy over the whole dataset indirectly.

Details for Query in web UI

computedState asserts that the State to be returned has at least the default protocol and metadata (actions) defined.


While executing the aggregation query, computedState withStatusCode with the following:

Property Value
statusCode DELTA
defaultMessage Compute snapshot for version: version

Tip

Use event logs for the INFO messages and web UI to monitor execution of the aggregation query with the following job description:

Delta: Compute snapshot for version: [version]

computedState assumes that the protocol and metadata (actions) are defined. computedState throws an IllegalStateException when the actions are not defined and spark.databricks.delta.stateReconstructionValidation.enabled configuration property is enabled.

The [action] of your Delta table couldn't be recovered while Reconstructing
version: [version]. Did you manually delete files in the _delta_log directory?

Note

The state.select(...).first() query uses last with ignoreNulls flag true and so may give no rows for first().

aggregationsToComputeState

aggregationsToComputeState: Map[String, Column]
Alias Aggregation Expression
sizeInBytes coalesce(sum(col("add.size")), lit(0L))
numOfSetTransactions count(col("txn"))
numOfFiles count(col("add"))
numOfRemoves count(col("remove"))
numOfMetadata count(col("metaData"))
numOfProtocol count(col("protocol"))
setTransactions collect_set(col("txn"))
metadata last(col("metaData"), ignoreNulls = true)
protocol last(col("protocol"), ignoreNulls = true)
fileSizeHistogram lit(null).cast(FileSizeHistogram.schema)

Configuration Properties

spark.databricks.delta.snapshotPartitions

Snapshot uses the spark.databricks.delta.snapshotPartitions configuration property for the number of partitions to use for state reconstruction.

spark.databricks.delta.stateReconstructionValidation.enabled

Snapshot uses the spark.databricks.delta.stateReconstructionValidation.enabled configuration property for reconstructing state.

State Dataset (of Actions)

state: Dataset[SingleAction]

state requests the cached delta table state for the current state (from the cache).

state is used when:

Cached State

cachedState: CachedDS[SingleAction]
Lazy Value

cachedState is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

withStatsCache caches the stateReconstruction DataFrame under the following name (with the version and the redactedPath):

Delta Table State #[version] - [redactedPath]

All AddFiles

allFiles: Dataset[AddFile]

allFiles simply takes the state dataset and selects AddFiles (adds where clause for add IS NOT NULL and select over the fields of AddFiles).

Note

allFiles simply adds where and select clauses. No computation happens yet as it is (a description of) a distributed computation as a Dataset[AddFile].

import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, "/tmp/delta/users")
val files = deltaLog.snapshot.allFiles

scala> :type files
org.apache.spark.sql.Dataset[org.apache.spark.sql.delta.actions.AddFile]

scala> files.show(truncate = false)
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
|path                                                               |partitionValues|size|modificationTime|dataChange|stats|tags|
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
|part-00000-4050db39-e0f5-485d-ab3b-3ca72307f621-c000.snappy.parquet|[]             |262 |1578083748000   |false     |null |null|
|part-00000-ba39f292-2970-4528-a40c-8f0aa5f796de-c000.snappy.parquet|[]             |262 |1578083570000   |false     |null |null|
|part-00003-99f9d902-24a7-4f76-a15a-6971940bc245-c000.snappy.parquet|[]             |429 |1578083748000   |false     |null |null|
|part-00007-03d987f1-5bb3-4b5b-8db9-97b6667107e2-c000.snappy.parquet|[]             |429 |1578083748000   |false     |null |null|
|part-00011-a759a8c2-507d-46dd-9da7-dc722316214b-c000.snappy.parquet|[]             |429 |1578083748000   |false     |null |null|
|part-00015-2e685d29-25ed-4262-90a7-5491847fd8d0-c000.snappy.parquet|[]             |429 |1578083748000   |false     |null |null|
|part-00015-ee0ac1af-e1e0-4422-8245-12da91ced0a2-c000.snappy.parquet|[]             |429 |1578083570000   |false     |null |null|
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+

allFiles is used when:

stateReconstruction Dataset of Actions

stateReconstruction: Dataset[SingleAction]

Note

stateReconstruction returns a Dataset[SingleAction] and so does not do any computation per se.

stateReconstruction is a Dataset of SingleActions (that is the dataset part) of the cachedState.

stateReconstruction loads the log file indices (that gives a Dataset[SingleAction]).

stateReconstruction maps over partitions (using Dataset.mapPartitions) and canonicalize the paths for AddFile and RemoveFile actions.

stateReconstruction adds file column that uses a UDF to assert that input_file_name() belongs to the Delta table.

Note

This UDF-based check is very clever.

stateReconstruction repartitions the Dataset using the path of add or remove actions (with the configurable number of partitions) and Dataset.sortWithinPartitions by the file column.

In the end, stateReconstruction maps over partitions (using Dataset.mapPartitions) that creates a InMemoryLogReplay, requests it to append the actions (as version 0) and checkpoint.

stateReconstruction is used when:

Loading Actions

loadActions: Dataset[SingleAction]

loadActions creates a union of Dataset[SingleAction]s for the indices (as LogicalRelations over a HadoopFsRelation) or defaults to an empty dataset.

indexToRelation

indexToRelation(
  index: DeltaLogFileIndex,
  schema: StructType = logSchema): LogicalRelation

indexToRelation converts the DeltaLogFileIndex to a LogicalRelation (Spark SQL) leaf logical operator (using the logSchema).

indexToRelation creates a LogicalRelation over a HadoopFsRelation (Spark SQL) with the given index and the schema.

emptyActions Dataset (of Actions)

emptyActions: Dataset[SingleAction]

emptyActions is an empty dataset of SingleActions for loadActions (and InitialSnapshot's state).

Table Properties

getProperties: mutable.HashMap[String, String]

getProperties returns the following:

getProperties is used when:

File Indices

fileIndices: Seq[DeltaLogFileIndex]
Lazy Value

fileIndices is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

fileIndices is the checkpointFileIndexOpt and the deltaFileIndexOpt (if available).

Commit File Index

deltaFileIndexOpt: Option[DeltaLogFileIndex]
Lazy Value

deltaFileIndexOpt is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

deltaFileIndexOpt is a DeltaLogFileIndex (in JsonFileFormat) for the checkpoint file of the LogSegment.

Checkpoint File Index

checkpointFileIndexOpt: Option[DeltaLogFileIndex]
Lazy Value

checkpointFileIndexOpt is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

checkpointFileIndexOpt is a DeltaLogFileIndex (in ParquetFileFormat) for the delta files of the LogSegment.

Transaction Version By App ID

transactions: Map[String, Long]

transactions takes the SetTransaction actions (from the state dataset) and makes them a lookup table of transaction version by appId.

Scala lazy value

transactions is a Scala lazy value and is initialized once at the first access. Once computed it stays unchanged for the Snapshot instance.

lazy val transactions: Map[String, Long]

transactions is used when OptimisticTransactionImpl is requested for the transaction version for a given (streaming query) id.

All RemoveFile Actions (Tombstones)

tombstones: Dataset[RemoveFile]

tombstones...FIXME

scala> deltaLog.snapshot.tombstones.show(false)
+----+-----------------+----------+
|path|deletionTimestamp|dataChange|
+----+-----------------+----------+
+----+-----------------+----------+

Data Schema (of Delta Table)

dataSchema: StructType

dataSchema requests the Metadata for the data schema.

dataSchema is part of the StatisticsCollection abstraction.

Metadata

metadata: Metadata

metadata is part of the SnapshotDescriptor and DataSkippingReaderBase abstractions.


metadata requests the computedState for the Metadata.