SnapshotManagement¶
SnapshotManagement
is an extension for DeltaLog to manage Snapshots.
Current Snapshot¶
SnapshotManagement
manages currentSnapshot
registry with the recently-loaded Snapshot (of a Delta table).
currentSnapshot
is initialized as the latest available Snapshot right when DeltaLog is created and updated on demand.
currentSnapshot
...FIXME
currentSnapshot
is used when:
SnapshotManagement
is requested to...FIXME
Loading Latest Snapshot at Initialization¶
getSnapshotAtInit: Snapshot
getSnapshotAtInit
getLogSegmentFrom for the last checkpoint.
getSnapshotAtInit
prints out the following INFO message to the logs:
Loading version [version][startCheckpoint]
getSnapshotAtInit
creates a Snapshot for the log segment.
getSnapshotAtInit
records the current time in lastUpdateTimestamp registry.
getSnapshotAtInit
prints out the following INFO message to the logs:
Returning initial snapshot [snapshot]
Fetching Log Files for Version Checkpointed¶
getLogSegmentFrom(
startingCheckpoint: Option[CheckpointMetaData]): LogSegment
getLogSegmentFrom
fetches log files for the version (based on the optional CheckpointMetaData
as the starting checkpoint version to start listing log files from).
Fetching Latest Checkpoint and Delta Log Files for Version¶
getLogSegmentForVersion(
startCheckpoint: Option[Long],
versionToLoad: Option[Long] = None): LogSegment
getLogSegmentForVersion
list all the files (in a transaction log) from the given startCheckpoint
(or defaults to 0
).
getLogSegmentForVersion
filters out unnecessary files and leaves checkpoint and delta files only.
getLogSegmentForVersion
filters out checkpoint files of size 0
.
getLogSegmentForVersion
takes all the files that are older than the requested versionToLoad
.
getLogSegmentForVersion
splits the files into checkpoint and delta files.
getLogSegmentForVersion
finds the latest checkpoint from the list.
In the end, getLogSegmentForVersion
creates a LogSegment with the (checkpoint and delta) files.
getLogSegmentForVersion
is used when:
SnapshotManagement
is requested for getLogSegmentFrom, updateInternal and getSnapshotAt
Listing Files from Version Upwards¶
listFrom(
startVersion: Long): Iterator[FileStatus]
listFrom
...FIXME
Creating Snapshot¶
createSnapshot(
segment: LogSegment,
minFileRetentionTimestamp: Long,
timestamp: Long): Snapshot
createSnapshot
readChecksum (for the version of the given LogSegment) and creates a Snapshot.
createSnapshot
is used when:
SnapshotManagement
is requested for getSnapshotAtInit, updateInternal and getSnapshotAt
Last Successful Update Timestamp¶
SnapshotManagement
uses lastUpdateTimestamp
internal registry for the timestamp of the last successful update.
Updating Current Snapshot¶
update(
stalenessAcceptable: Boolean = false): Snapshot
update
determines whether to do update asynchronously or not based on the input stalenessAcceptable
flag and isSnapshotStale.
With stalenessAcceptable
flag turned off (the default value) and the state snapshot is not stale, update
updates (with isAsync
flag turned off).
update
...FIXME
Usage¶
update
is used when:
DeltaHistoryManager
is requested to getHistory, getActiveCommitAtTime, checkVersionExistsDeltaLog
is requested to start a transactionOptimisticTransactionImpl
is requested to doCommit and getNextAttemptVersionDeltaTableV2
is requested for a SnapshotTahoeLogFileIndex
is requested for a SnapshotDeltaSource
is requested for the getStartingVersion- In Delta commands...
isSnapshotStale¶
isSnapshotStale: Boolean
isSnapshotStale
reads spark.databricks.delta.stalenessLimit configuration property.
isSnapshotStale
is enabled (true
) when any of the following holds:
- spark.databricks.delta.stalenessLimit configuration property is
0
(the default) - Internal lastUpdateTimestamp has never been updated (and is below
0
) or is at least spark.databricks.delta.stalenessLimit configuration property old
tryUpdate¶
tryUpdate(
isAsync: Boolean = false): Snapshot
tryUpdate
...FIXME
updateInternal¶
updateInternal(
isAsync: Boolean): Snapshot // (1)
isAsync
flag is not used
updateInternal
requests the current Snapshot for the LogSegment that is in turn requested for the checkpointVersion. updateInternal
gets the LogSegment for the checkpointVersion
.
If the LogSegment
s are equal (and so no new files have been added), updateInternal
updates the lastUpdateTimestamp registry to the current timestamp and returns the currentSnapshot.
Otherwise, if the fetched LogSegment
is different than the current Snapshot's, updateInternal
prints out the following INFO message to the logs:
Loading version [version][ starting from checkpoint version [v]]
updateInternal
creates a new Snapshot with the fetched LogSegment
.
updateInternal
replaces Snapshots and prints out the following INFO message to the logs:
Updated snapshot to [newSnapshot]
Replacing Snapshots¶
replaceSnapshot(
newSnapshot: Snapshot): Unit
replaceSnapshot
requests the currentSnapshot to uncache (and drop any cached data) and makes the given newSnapshot
the current one.
Demo¶
import org.apache.spark.sql.delta.DeltaLog
val log = DeltaLog.forTable(spark, dataPath)
import org.apache.spark.sql.delta.SnapshotManagement
assert(log.isInstanceOf[SnapshotManagement], "DeltaLog is a SnapshotManagement")
val snapshot = log.update(stalenessAcceptable = false)
scala> :type snapshot
org.apache.spark.sql.delta.Snapshot
assert(snapshot.version == 0)
Logging¶
As an extension of DeltaLog, use DeltaLog logging to see what happens inside.