Snapshot¶
Snapshot
is an immutable snapshot of the state of a delta table (in the deltaLog) at the given version.
Snapshot
uses aggregation expressions while computing state (as State).
Snapshot
loads the actions (per the DeltaLogFileIndices) and builds a DataFrame
.
Creating Instance¶
Snapshot
takes the following to be created:
- Hadoop Path to the log directory
- Version
- LogSegment
-
minFileRetentionTimestamp
(that is exactly DeltaLog.minFileRetentionTimestamp) - DeltaLog
- Timestamp
-
VersionChecksum
While being created, Snapshot
prints out the following INFO message to the logs and triggers initialization.
Created snapshot [this]
Snapshot
is created when:
SnapshotManagement
is requested for a Snapshot
Initializing¶
init(): Unit
init
validates the protocol and the metadata:
- Requests the DeltaLog to assert that the Protocol is protocolRead
- Requests the DeltaLog to assertTableFeaturesMatchMetadata (with the Protocol and the Metadata)
- Records undefined types (in the Delta protocol) in the schema (of the Metadata)
Maximum Number of Indexed Columns¶
numIndexedCols: Int
numIndexedCols
is the value of dataSkippingNumIndexedCols table property.
Lazy Value
numIndexedCols
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
numIndexedCols
is part of the StatisticsCollection abstraction.
Demo¶
Computed State¶
computedState: State
Lazy Value
computedState
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
computedState
takes the current cached set of actions and reads the latest state (executes a state.select(...).first()
query) with the aggregations (that are then mapped to a State).
Note
The state.select(...).first()
query uses aggregate standard functions (e.g. last
, collect_set
, sum
, count
) and so uses groupBy
over the whole dataset indirectly.
computedState
asserts that the State
to be returned has at least the default protocol and metadata (actions) defined.
While executing the aggregation query, computedState
withStatusCode with the following:
Property | Value |
---|---|
statusCode | DELTA |
defaultMessage | Compute snapshot for version: version |
Tip
Use event logs for the INFO messages and web UI to monitor execution of the aggregation query with the following job description:
Delta: Compute snapshot for version: [version]
computedState
assumes that the protocol and metadata (actions) are defined. computedState
throws an IllegalStateException
when the actions are not defined and spark.databricks.delta.stateReconstructionValidation.enabled configuration property is enabled.
The [action] of your Delta table couldn't be recovered while Reconstructing
version: [version]. Did you manually delete files in the _delta_log directory?
Note
The state.select(...).first()
query uses last
with ignoreNulls
flag true
and so may give no rows for first()
.
aggregationsToComputeState¶
aggregationsToComputeState: Map[String, Column]
Alias | Aggregation Expression |
---|---|
sizeInBytes | coalesce(sum(col("add.size")), lit(0L)) |
numOfSetTransactions | count(col("txn")) |
numOfFiles | count(col("add")) |
numOfRemoves | count(col("remove")) |
numOfMetadata | count(col("metaData")) |
numOfProtocol | count(col("protocol")) |
setTransactions | collect_set(col("txn")) |
metadata | last(col("metaData"), ignoreNulls = true) |
protocol | last(col("protocol"), ignoreNulls = true) |
fileSizeHistogram | lit(null).cast(FileSizeHistogram.schema) |
Configuration Properties¶
spark.databricks.delta.snapshotPartitions¶
Snapshot
uses the spark.databricks.delta.snapshotPartitions configuration property for the number of partitions to use for state reconstruction.
spark.databricks.delta.stateReconstructionValidation.enabled¶
Snapshot
uses the spark.databricks.delta.stateReconstructionValidation.enabled configuration property for reconstructing state.
State Dataset (of Actions)¶
state: Dataset[SingleAction]
state
requests the cached delta table state for the current state (from the cache).
state
is used when:
Checkpoints
utility is used to writeCheckpointSnapshot
is requested for computedState, all files and files removed (tombstones)VacuumCommand
utility is requested for garbage collection
Cached State¶
cachedState: CachedDS[SingleAction]
Lazy Value
cachedState
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
withStatsCache
caches the stateReconstruction DataFrame under the following name (with the version and the redactedPath):
Delta Table State #[version] - [redactedPath]
All AddFiles¶
allFiles: Dataset[AddFile]
allFiles
simply takes the state dataset and selects AddFiles (adds where
clause for add IS NOT NULL
and select
over the fields of AddFiles).
Note
allFiles
simply adds where
and select
clauses. No computation happens yet as it is (a description of) a distributed computation as a Dataset[AddFile]
.
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, "/tmp/delta/users")
val files = deltaLog.snapshot.allFiles
scala> :type files
org.apache.spark.sql.Dataset[org.apache.spark.sql.delta.actions.AddFile]
scala> files.show(truncate = false)
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
|path |partitionValues|size|modificationTime|dataChange|stats|tags|
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
|part-00000-4050db39-e0f5-485d-ab3b-3ca72307f621-c000.snappy.parquet|[] |262 |1578083748000 |false |null |null|
|part-00000-ba39f292-2970-4528-a40c-8f0aa5f796de-c000.snappy.parquet|[] |262 |1578083570000 |false |null |null|
|part-00003-99f9d902-24a7-4f76-a15a-6971940bc245-c000.snappy.parquet|[] |429 |1578083748000 |false |null |null|
|part-00007-03d987f1-5bb3-4b5b-8db9-97b6667107e2-c000.snappy.parquet|[] |429 |1578083748000 |false |null |null|
|part-00011-a759a8c2-507d-46dd-9da7-dc722316214b-c000.snappy.parquet|[] |429 |1578083748000 |false |null |null|
|part-00015-2e685d29-25ed-4262-90a7-5491847fd8d0-c000.snappy.parquet|[] |429 |1578083748000 |false |null |null|
|part-00015-ee0ac1af-e1e0-4422-8245-12da91ced0a2-c000.snappy.parquet|[] |429 |1578083570000 |false |null |null|
+-------------------------------------------------------------------+---------------+----+----------------+----------+-----+----+
allFiles
is used when:
-
PartitionFiltering
is requested for the files to scan (matching projection attributes and predicates) -
DeltaSourceSnapshot
is requested for the initial files (indexed AddFiles) -
GenerateSymlinkManifestImpl
is requested to generateIncrementalManifest and generateFullManifest -
DeltaDataSource
is requested for an Insertable HadoopFsRelation
stateReconstruction Dataset of Actions¶
stateReconstruction: Dataset[SingleAction]
Note
stateReconstruction
returns a Dataset[SingleAction]
and so does not do any computation per se.
stateReconstruction
is a Dataset
of SingleActions (that is the dataset part) of the cachedState.
stateReconstruction
loads the log file indices (that gives a Dataset[SingleAction]
).
stateReconstruction
maps over partitions (using Dataset.mapPartitions
) and canonicalize the paths for AddFile and RemoveFile actions.
stateReconstruction
adds file
column that uses a UDF to assert that input_file_name()
belongs to the Delta table.
Note
This UDF-based check is very clever.
stateReconstruction
repartitions the Dataset
using the path of add or remove actions (with the configurable number of partitions) and Dataset.sortWithinPartitions
by the file
column.
In the end, stateReconstruction
maps over partitions (using Dataset.mapPartitions
) that creates a InMemoryLogReplay, requests it to append the actions (as version 0
) and checkpoint.
stateReconstruction
is used when:
Snapshot
is requested for a cached Delta table state
Loading Actions¶
loadActions: Dataset[SingleAction]
loadActions
creates a union of Dataset[SingleAction]
s for the indices (as LogicalRelations over a HadoopFsRelation
) or defaults to an empty dataset.
indexToRelation¶
indexToRelation(
index: DeltaLogFileIndex,
schema: StructType = logSchema): LogicalRelation
indexToRelation
converts the DeltaLogFileIndex to a LogicalRelation
(Spark SQL) leaf logical operator (using the logSchema).
indexToRelation
creates a LogicalRelation
over a HadoopFsRelation
(Spark SQL) with the given index and the schema.
emptyActions Dataset (of Actions)¶
emptyActions: Dataset[SingleAction]
emptyActions
is an empty dataset of SingleActions for loadActions (and InitialSnapshot
's state
).
Table Properties¶
getProperties: mutable.HashMap[String, String]
getProperties
returns the following:
- Configuration (of the Metadata) without
path
- delta.minReaderVersion to be the minReaderVersion (of the Protocol)
- delta.minWriterVersion to be the minWriterVersion (of the Protocol)
getProperties
is used when:
DeltaTableV2
is requested for the table properties
File Indices¶
fileIndices: Seq[DeltaLogFileIndex]
Lazy Value
fileIndices
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
fileIndices
is the checkpointFileIndexOpt and the deltaFileIndexOpt (if available).
Commit File Index¶
deltaFileIndexOpt: Option[DeltaLogFileIndex]
Lazy Value
deltaFileIndexOpt
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
deltaFileIndexOpt
is a DeltaLogFileIndex (in JsonFileFormat
) for the checkpoint file of the LogSegment.
Checkpoint File Index¶
checkpointFileIndexOpt: Option[DeltaLogFileIndex]
Lazy Value
checkpointFileIndexOpt
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
checkpointFileIndexOpt
is a DeltaLogFileIndex (in ParquetFileFormat
) for the delta files of the LogSegment.
Transaction Version By App ID¶
transactions: Map[String, Long]
transactions
takes the SetTransaction actions (from the state dataset) and makes them a lookup table of transaction version by appId.
Scala lazy value
transactions
is a Scala lazy value and is initialized once at the first access. Once computed it stays unchanged for the Snapshot
instance.
lazy val transactions: Map[String, Long]
transactions
is used when OptimisticTransactionImpl
is requested for the transaction version for a given (streaming query) id.
All RemoveFile Actions (Tombstones)¶
tombstones: Dataset[RemoveFile]
tombstones
...FIXME
scala> deltaLog.snapshot.tombstones.show(false)
+----+-----------------+----------+
|path|deletionTimestamp|dataChange|
+----+-----------------+----------+
+----+-----------------+----------+
Data Schema (of Delta Table)¶
dataSchema: StructType
dataSchema
requests the Metadata for the data schema.
dataSchema
is part of the StatisticsCollection abstraction.
Metadata¶
metadata: Metadata
metadata
is part of the SnapshotDescriptor and DataSkippingReaderBase abstractions.
metadata
requests the computedState for the Metadata.