SnapshotManagement¶
SnapshotManagement is an extension for DeltaLog to manage Snapshots.
Current Snapshot¶
currentSnapshot: CapturedSnapshot
SnapshotManagement defines currentSnapshot registry with the recently-loaded Snapshot of the delta table.
currentSnapshot is the latest Snapshot initially and can be updated on demand (when installLogSegmentInternal and replaceSnapshot).
currentSnapshot...FIXME
currentSnapshot is used when SnapshotManagement is requested for the following:
- unsafeVolatileSnapshot
- update (tryUpdate, updateInternal, installLogSegmentInternal, replaceSnapshot)
- updateAfterCommit
Loading Latest Snapshot at Initialization¶
getSnapshotAtInit: CapturedSnapshot
getSnapshotAtInit finds the LogSegment of the delta table (using the last checkpoint file if available)
In the end, getSnapshotAtInit createSnapshotAtInitInternal.
createSnapshotAtInitInternal¶
createSnapshotAtInitInternal(
initSegment: Option[LogSegment],
lastCheckpointOpt: Option[CheckpointMetaData],
timestamp: Long): CapturedSnapshot
createSnapshotAtInitInternal...FIXME
Fetching Log Files for Version Checkpointed¶
getLogSegmentFrom(
startingCheckpoint: Option[LastCheckpointInfo]): Option[LogSegment]
getLogSegmentFrom fetches log files for the version (based on the optional CheckpointMetaData as the starting checkpoint version to start listing log files from).
Fetching Latest LogSegment for Version¶
getLogSegmentForVersion(
versionToLoad: Option[Long] = None,
oldCheckpointProviderOpt: Option[UninitializedCheckpointProvider] = None,
lastCheckpointInfo: Option[LastCheckpointInfo] = None): Option[LogSegment]
getLogSegmentForVersion(
versionToLoad: Option[Long],
files: Option[Array[FileStatus]],
validateLogSegmentWithoutCompactedDeltas: Boolean,
oldCheckpointProviderOpt: Option[UninitializedCheckpointProvider],
lastCheckpointInfo: Option[LastCheckpointInfo]): Option[LogSegment]
getLogSegmentForVersion list all the files (in a transaction log) from the given startCheckpoint (or defaults to 0).
getLogSegmentForVersion filters out unnecessary files and leaves checkpoint and delta files only.
getLogSegmentForVersion filters out checkpoint files of size 0.
getLogSegmentForVersion takes all the files that are older than the requested versionToLoad.
getLogSegmentForVersion splits the files into checkpoint and delta files.
getLogSegmentForVersion finds the latest checkpoint from the list.
In the end, getLogSegmentForVersion creates a LogSegment with the (checkpoint and delta) files.
getLogSegmentForVersion is used when:
SnapshotManagementis requested to getUpdatedLogSegment, getLogSegmentAfterCommit, getLogSegmentFrom, getSnapshotAt, updateInternal
Listing Files from Version Upwards¶
listFrom(
startVersion: Long): Iterator[FileStatus]
listFrom...FIXME
validateDeltaVersions¶
validateDeltaVersions(
selectedDeltas: Array[FileStatus],
checkpointVersion: Long,
versionToLoad: Option[Long]): Unit
Procedure
validateDeltaVersions is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
validateDeltaVersions...FIXME
Creating Snapshot¶
createSnapshot(
segment: LogSegment,
minFileRetentionTimestamp: Long,
timestamp: Long): Snapshot
createSnapshot readChecksum (for the version of the given LogSegment) and creates a Snapshot.
createSnapshot is used when:
SnapshotManagementis requested for getSnapshotAtInit, updateInternal and getSnapshotAt
Last Successful Update Timestamp¶
SnapshotManagement uses lastUpdateTimestamp internal registry for the timestamp of the last successful update.
Updating Snapshot¶
update(
stalenessAcceptable: Boolean = false,
checkIfUpdatedSinceTs: Option[Long] = None): Snapshot
update determines whether to do update asynchronously or not based on the input stalenessAcceptable flag and isSnapshotStale.
With stalenessAcceptable flag turned off (the default value) and the state snapshot is not stale, update updates (with isAsync flag turned off).
update...FIXME
Usage¶
update is used when:
DeltaHistoryManageris requested to getHistory, getActiveCommitAtTime, checkVersionExistsDeltaLogis requested to start a transactionOptimisticTransactionImplis requested to doCommit and getNextAttemptVersionDeltaTableV2is requested for a SnapshotTahoeLogFileIndexis requested for a SnapshotDeltaSourceis requested for the getStartingVersion- In Delta commands...
isSnapshotStale¶
isSnapshotStale: Boolean
isSnapshotStale reads spark.databricks.delta.stalenessLimit configuration property.
isSnapshotStale is enabled (true) when any of the following holds:
- spark.databricks.delta.stalenessLimit configuration property is
0(the default) - Internal lastUpdateTimestamp has never been updated (and is below
0) or is at least spark.databricks.delta.stalenessLimit configuration property old
tryUpdate¶
tryUpdate(
isAsync: Boolean = false): Snapshot
tryUpdate...FIXME
updateInternal¶
updateInternal(
isAsync: Boolean): Snapshot
updateInternal requests the current Snapshot for the LogSegment that is in turn requested for the checkpointVersion to get the LogSegment for.
updateInternal installLogSegmentInternal.
installLogSegmentInternal¶
installLogSegmentInternal(
previousSnapshot: Snapshot,
segmentOpt: Option[LogSegment],
updateTimestamp: Long,
isAsync: Boolean): Snapshot // (1)!
isAsyncis not used
installLogSegmentInternal gives the Snapshot (possibly an InitialSnapshot) of the delta table at the logPath.
installLogSegmentInternal...FIXME
With no LogSegment specified, installLogSegmentInternal prints out the following INFO message to the logs and replaceSnapshot with a new InitialSnapshot (for the logPath).
No delta log found for the Delta table at [logPath]
Replacing Snapshots¶
replaceSnapshot(
newSnapshot: Snapshot): Unit
replaceSnapshot requests the currentSnapshot to uncache (and drop any cached data) and makes the given newSnapshot the current one.
updateAfterCommit¶
updateAfterCommit(
committedVersion: Long,
newChecksumOpt: Option[VersionChecksum],
preCommitLogSegment: LogSegment): Snapshot
updateAfterCommit...FIXME
updateAfterCommit is used when:
OptimisticTransactionImplis requested to attempt a commit
getLogSegmentAfterCommit¶
getLogSegmentAfterCommit(
oldCheckpointProvider: UninitializedCheckpointProvider): LogSegment
getLogSegmentAfterCommit...FIXME
getSnapshotAt¶
getSnapshotAt(
version: Long,
lastCheckpointProvider: CheckpointProvider): Snapshot
getSnapshotAt...FIXME
getSnapshotAt is used when:
- CheckpointHook is executed
getUpdatedLogSegment¶
getUpdatedLogSegment(
oldLogSegment: LogSegment): (LogSegment, Seq[FileStatus])
getUpdatedLogSegment...FIXME
getUpdatedLogSegment is used when:
OptimisticTransactionImplis requested to getConflictingVersions
Demo¶
import org.apache.spark.sql.delta.DeltaLog
val log = DeltaLog.forTable(spark, dataPath)
import org.apache.spark.sql.delta.SnapshotManagement
assert(log.isInstanceOf[SnapshotManagement], "DeltaLog is a SnapshotManagement")
val snapshot = log.update(stalenessAcceptable = false)
scala> :type snapshot
org.apache.spark.sql.delta.Snapshot
assert(snapshot.version == 0)
Logging¶
As an extension of DeltaLog, use DeltaLog logging to see what happens inside.