Snapshot¶
Snapshot is an immutable snapshot of the state of a delta table (in the deltaLog) at the given version.
Snapshot uses aggregation expressions while computing state (as State).
Snapshot loads the actions (per the DeltaLogFileIndices) and builds a DataFrame.
Creating Instance¶
Snapshot takes the following to be created:
- Hadoop Path to the log directory
- Version
- LogSegment
-
minFileRetentionTimestamp(that is exactly DeltaLog.minFileRetentionTimestamp) - DeltaLog
- Timestamp
-
VersionChecksum
While being created, Snapshot prints out the following INFO message to the logs and triggers initialization.
Created snapshot [this]
Snapshot is created when:
SnapshotManagementis requested for a Snapshot
Initializing¶
init(): Unit
init validates the protocol and the metadata:
- Requests the DeltaLog to assert that the Protocol is protocolRead
- Requests the DeltaLog to assertTableFeaturesMatchMetadata (with the Protocol and the Metadata)
- Records undefined types (in the Delta protocol) in the schema (of the Metadata)
Maximum Number of Indexed Columns¶
numIndexedCols: Int
numIndexedCols is the value of dataSkippingNumIndexedCols table property.
Lazy Value
numIndexedCols is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
numIndexedCols is part of the StatisticsCollection abstraction.
Demo¶
Computed State¶
computedState: State
Lazy Value
computedState is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
computedState takes the current cached set of actions and reads the latest state (executes a state.select(...).first() query) with the aggregations (that are then mapped to a State).
Note
The state.select(...).first() query uses aggregate standard functions (e.g. last, collect_set, sum, count) and so uses groupBy over the whole dataset indirectly.

computedState asserts that the State to be returned has at least the default protocol and metadata (actions) defined.
While executing the aggregation query, computedState withStatusCode with the following:
| Property | Value |
|---|---|
| statusCode | DELTA |
| defaultMessage | Compute snapshot for version: version |
Tip
Use event logs for the INFO messages and web UI to monitor execution of the aggregation query with the following job description:
Delta: Compute snapshot for version: [version]
computedState assumes that the protocol and metadata (actions) are defined. computedState throws an IllegalStateException when the actions are not defined and spark.databricks.delta.stateReconstructionValidation.enabled configuration property is enabled.
The [action] of your Delta table couldn't be recovered while Reconstructing
version: [version]. Did you manually delete files in the _delta_log directory?
Note
The state.select(...).first() query uses last with ignoreNulls flag true and so may give no rows for first().
aggregationsToComputeState¶
aggregationsToComputeState: Map[String, Column]
| Alias | Aggregation Expression |
|---|---|
sizeInBytes | coalesce(sum(col("add.size")), lit(0L)) |
numOfSetTransactions | count(col("txn")) |
numOfFiles | count(col("add")) |
numOfRemoves | count(col("remove")) |
numOfMetadata | count(col("metaData")) |
numOfProtocol | count(col("protocol")) |
setTransactions | collect_set(col("txn")) |
metadata | last(col("metaData"), ignoreNulls = true) |
protocol | last(col("protocol"), ignoreNulls = true) |
fileSizeHistogram | lit(null).cast(FileSizeHistogram.schema) |
Configuration Properties¶
spark.databricks.delta.snapshotPartitions¶
Snapshot uses the spark.databricks.delta.snapshotPartitions configuration property for the number of partitions to use for state reconstruction.
spark.databricks.delta.stateReconstructionValidation.enabled¶
Snapshot uses the spark.databricks.delta.stateReconstructionValidation.enabled configuration property for reconstructing state.
State Dataset (of Actions)¶
state: Dataset[SingleAction]
state requests the cached delta table state for the current state (from the cache).
state is used when:
Checkpointsutility is used to writeCheckpointSnapshotis requested for computedState, all files and files removed (tombstones)VacuumCommandutility is requested for garbage collection
Cached State¶
cachedState: CachedDS[SingleAction]
Lazy Value
cachedState is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
withStatsCache caches the stateReconstruction DataFrame under the following name (with the version and the redactedPath):
Delta Table State #[version] - [redactedPath]
All AddFiles¶
allFiles: Dataset[AddFile]
allFiles simply takes the state dataset and selects AddFiles (adds where clause for add IS NOT NULL and select over the fields of AddFiles).
Note
allFiles simply adds where and select clauses. No computation happens yet as it is (a description of) a distributed computation as a Dataset[AddFile].
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, "/tmp/delta/users")
val files = deltaLog.snapshot.allFiles
scala> :type files
org.apache.spark.sql.Dataset[org.apache.spark.sql.delta.actions.AddFile]
scala> files.show(truncate = false)
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
|path |partitionValues|size|modificationTime|dataChange|stats|tags|
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
|part-00000-4050db39-e0f5-485d-ab3b-3ca72307f621-c000.snappy.parquet|[] |262 |1578083748000 |false |null |null|
|part-00000-ba39f292-2970-4528-a40c-8f0aa5f796de-c000.snappy.parquet|[] |262 |1578083570000 |false |null |null|
|part-00003-99f9d902-24a7-4f76-a15a-6971940bc245-c000.snappy.parquet|[] |429 |1578083748000 |false |null |null|
|part-00007-03d987f1-5bb3-4b5b-8db9-97b6667107e2-c000.snappy.parquet|[] |429 |1578083748000 |false |null |null|
|part-00011-a759a8c2-507d-46dd-9da7-dc722316214b-c000.snappy.parquet|[] |429 |1578083748000 |false |null |null|
|part-00015-2e685d29-25ed-4262-90a7-5491847fd8d0-c000.snappy.parquet|[] |429 |1578083748000 |false |null |null|
|part-00015-ee0ac1af-e1e0-4422-8245-12da91ced0a2-c000.snappy.parquet|[] |429 |1578083570000 |false |null |null|
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
allFiles is used when:
-
PartitionFilteringis requested for the files to scan (matching projection attributes and predicates) -
DeltaSourceSnapshotis requested for the initial files (indexed AddFiles) -
GenerateSymlinkManifestImplis requested to generateIncrementalManifest and generateFullManifest -
DeltaDataSourceis requested for an Insertable HadoopFsRelation
stateReconstruction Dataset of Actions¶
stateReconstruction: Dataset[SingleAction]
Note
stateReconstruction returns a Dataset[SingleAction] and so does not do any computation per se.
stateReconstruction is a Dataset of SingleActions (that is the dataset part) of the cachedState.
stateReconstruction loads the log file indices (that gives a Dataset[SingleAction]).
stateReconstruction maps over partitions (using Dataset.mapPartitions) and canonicalize the paths for AddFile and RemoveFile actions.
stateReconstruction adds file column that uses a UDF to assert that input_file_name() belongs to the Delta table.
Note
This UDF-based check is very clever.
stateReconstruction repartitions the Dataset using the path of add or remove actions (with the configurable number of partitions) and Dataset.sortWithinPartitions by the file column.
In the end, stateReconstruction maps over partitions (using Dataset.mapPartitions) that creates a InMemoryLogReplay, requests it to append the actions (as version 0) and checkpoint.
stateReconstruction is used when:
Snapshotis requested for a cached Delta table state
Loading Actions¶
loadActions: Dataset[SingleAction]
loadActions creates a union of Dataset[SingleAction]s for the indices (as LogicalRelations over a HadoopFsRelation) or defaults to an empty dataset.
indexToRelation¶
indexToRelation(
index: DeltaLogFileIndex,
schema: StructType = logSchema): LogicalRelation
indexToRelation converts the DeltaLogFileIndex to a LogicalRelation (Spark SQL) leaf logical operator (using the logSchema).
indexToRelation creates a LogicalRelation over a HadoopFsRelation (Spark SQL) with the given index and the schema.
emptyActions Dataset (of Actions)¶
emptyActions: Dataset[SingleAction]
emptyActions is an empty dataset of SingleActions for loadActions (and InitialSnapshot's state).
Table Properties¶
getProperties: mutable.HashMap[String, String]
getProperties returns the following:
- Configuration (of the Metadata) without
path - delta.minReaderVersion to be the minReaderVersion (of the Protocol)
- delta.minWriterVersion to be the minWriterVersion (of the Protocol)
getProperties is used when:
DeltaTableV2is requested for the table properties
File Indices¶
fileIndices: Seq[DeltaLogFileIndex]
Lazy Value
fileIndices is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
fileIndices is the checkpointFileIndexOpt and the deltaFileIndexOpt (if available).
Commit File Index¶
deltaFileIndexOpt: Option[DeltaLogFileIndex]
Lazy Value
deltaFileIndexOpt is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
deltaFileIndexOpt is a DeltaLogFileIndex (in JsonFileFormat) for the checkpoint file of the LogSegment.
Checkpoint File Index¶
checkpointFileIndexOpt: Option[DeltaLogFileIndex]
Lazy Value
checkpointFileIndexOpt is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
checkpointFileIndexOpt is a DeltaLogFileIndex (in ParquetFileFormat) for the delta files of the LogSegment.
Transaction Version By App ID¶
transactions: Map[String, Long]
transactions takes the SetTransaction actions (from the state dataset) and makes them a lookup table of transaction version by appId.
Scala lazy value
transactions is a Scala lazy value and is initialized once at the first access. Once computed it stays unchanged for the Snapshot instance.
lazy val transactions: Map[String, Long]
transactions is used when OptimisticTransactionImpl is requested for the transaction version for a given (streaming query) id.
All RemoveFile Actions (Tombstones)¶
tombstones: Dataset[RemoveFile]
tombstones...FIXME
scala> deltaLog.snapshot.tombstones.show(false)
+----+-----------------+----------+
|path|deletionTimestamp|dataChange|
+----+-----------------+----------+
+----+-----------------+----------+
Data Schema (of Delta Table)¶
dataSchema: StructType
dataSchema requests the Metadata for the data schema.
dataSchema is part of the StatisticsCollection abstraction.
Metadata¶
metadata: Metadata
metadata is part of the SnapshotDescriptor and DataSkippingReaderBase abstractions.
metadata requests the computedState for the Metadata.