OptimisticTransactionImpl¶
OptimisticTransactionImpl is an extension of the TransactionalWrite abstraction for optimistic transactions that can modify a delta table (at a given version) and can be committed eventually.
In other words, OptimisticTransactionImpl is a set of actions as part of an Operation that changes the state of a delta table transactionally.
Contract¶
Clock¶
clock: Clock
DeltaLog¶
deltaLog: DeltaLog
DeltaLog (of a delta table) this transaction commits changes to
deltaLog is part of the TransactionalWrite abstraction and seems to change it to val (from def).
Snapshot¶
snapshot: Snapshot
Snapshot (of the delta table) this transaction commits changes to
snapshot is part of the TransactionalWrite contract and seems to change it to val (from def).
Implementations¶
Table Version at Reading Time¶
readVersion: Long
readVersion requests the Snapshot for the version.
readVersion is used when:
OptimisticTransactionImplis requested to updateMetadata and commit- AlterDeltaTableCommand, ConvertToDeltaCommand, CreateDeltaTableCommand commands are executed
DeltaCommandis requested to commitLargeWriteIntoDeltais requested to writeImplicitMetadataOperationis requested to updateMetadata
Transactional Commit¶
commit(
actions: Seq[Action],
op: DeltaOperations.Operation): Long
commit attempts to commit the given Actions (as part of the Operation) and gives the commit version.
Usage¶
commit is used when:
- ALTER TABLE commands are executed
- ConvertToDeltaCommand is executed
- CreateDeltaTableCommand is executed
- DeleteCommand is executed
DeltaLogis requested to upgrade the protocolDeltaSinkis requested to add a streaming micro-batch- MergeIntoCommand is executed
- OptimizeTableCommand is executed (and requests
OptimizeExecutorto commitAndRetry) StatisticsCollectionis requested to recompute- UpdateCommand is executed
- WriteIntoDelta is executed
Preparing Commit¶
commit then prepares a commit (that gives the final actions to commit that may be different from the given actions).
Isolation Level¶
commit determines the isolation level based on FileActions (in the given actions) and their dataChange flag.
With all actions with dataChange flag disabled (false), commit assumes no data changed and chooses SnapshotIsolation else Serializable.
Blind Append¶
commit is considered blind append when the following all hold:
- There are only AddFiles among FileActions in the actions (onlyAddFiles)
- It does not depend on files, i.e. the readPredicates and readFiles are empty (dependsOnFiles)
CommitInfo¶
commit...FIXME
Registering Post-Commit Hook¶
commit registers the GenerateSymlinkManifest post-commit hook when there is a FileAction among the actions and the compatibility.symlinkFormatManifest.enabled table property is enabled.
doCommitRetryIteratively¶
commit doCommitRetryIteratively.
commit prints out the following INFO message to the logs:
Committed delta #[commitVersion] to [logPath]
Performing Post-Commit Operations¶
commit postCommit (with the version committed and the needsCheckpoint flag).
Executing Post-Commit Hooks¶
In the end, commit runs post-commit hooks and returns the version of the successful commit.
doCommitRetryIteratively¶
doCommitRetryIteratively(
attemptVersion: Long,
currentTransactionInfo: CurrentTransactionInfo,
isolationLevel: IsolationLevel): (Long, CurrentTransactionInfo, Boolean)
doCommitRetryIteratively acquires a lock on the delta table if enabled for the commit.
doCommitRetryIteratively uses attemptNumber internal counter to track the number of attempts. In case of a FileAlreadyExistsException, doCommitRetryIteratively increments the attemptNumber and tries over.
In the end, doCommitRetryIteratively returns a tuple with the following:
- Commit version (from the given
attemptVersioninclusive up to spark.databricks.delta.maxCommitAttempts) CurrentTransactionInfo- Whether the commit needs checkpoint or not (
needsCheckpoint)
Firstly, doCommitRetryIteratively does the first attempt at commit. If successful, the commit is done.
If there is a retry, doCommitRetryIteratively checkForConflicts followed by another attempt at commit.
If the number of commit attempts (attemptNumber) is above the spark.databricks.delta.maxCommitAttempts configuration property, doCommitRetryIteratively throws a DeltaIllegalStateException:
This commit has failed as it has been tried <numAttempts> times but did not succeed.
This can be caused by the Delta table being committed continuously by many concurrent commits.
Commit started at version: [attemptNumber]
Commit failed at version: [attemptVersion]
Number of actions attempted to commit: [numActions]
Total time spent attempting this commit: [timeSpent] ms
Checking Logical Conflicts with Concurrent Updates¶
checkForConflicts(
checkVersion: Long,
actions: Seq[Action],
attemptNumber: Int,
commitIsolationLevel: IsolationLevel): Long
checkForConflicts checks for logical conflicts (of the given actions) with concurrent updates (actions of the commits since the transaction has started).
checkForConflicts gives the next possible commit version unless the following happened between the time of read (checkVersion) and the time of this commit attempt:
- Client is up to date with the table protocol for reading and writing (and hence allowed to access the table)
- Protocol version has changed
- Metadata has changed
- AddFiles have been added that the txn should have read based on the given IsolationLevel (Concurrent Append)
- AddFiles that the txn read have been deleted (Concurrent Delete)
- Files have been deleted by the txn and since the time of read (Concurrent Delete)
- Idempotent transactions have conflicted (Multiple Streaming Queries with the same checkpoint location)
checkForConflicts takes the next possible commit version.
For every commit since the time of read (checkVersion) and this commit attempt, checkForConflicts does the following:
-
FIXME
-
Prints out the following INFO message to the logs:
Completed checking for conflicts Version: [version] Attempt: [attemptNumber] Time: [totalCheckAndRetryTime] ms
In the end, checkForConflicts prints out the following INFO message to the logs:
No logical conflicts with deltas [[checkVersion], [nextAttemptVersion]), retrying.
getPrettyPartitionMessage¶
getPrettyPartitionMessage(
partitionValues: Map[String, String]): String
getPrettyPartitionMessage...FIXME
postCommit¶
postCommit(
commitVersion: Long,
needsCheckpoint: Boolean): Unit
postCommit turns the committed flag on.
With the given needsCheckpoint enabled (that comes indirectly from doCommit), postCommit requests the DeltaLog for the Snapshot at the given commitVersion followed by checkpointing.
prepareCommit¶
prepareCommit(
actions: Seq[Action],
op: DeltaOperations.Operation): Seq[Action]
prepareCommit adds the newMetadata action (if available) to the given actions.
prepareCommit verifyNewMetadata if there was one.
prepareCommit...FIXME
prepareCommit requests the DeltaLog to protocolWrite.
prepareCommit...FIXME
Multiple Metadata Changes Not Allowed¶
prepareCommit throws an AssertionError when there are multiple metadata changes in the transaction (by means of Metadata actions):
Cannot change the metadata more than once in a transaction.
Committing Transaction Allowed Once Only¶
prepareCommit throws an AssertionError when the committed internal flag is enabled:
Transaction already committed.
performCdcColumnMappingCheck¶
performCdcColumnMappingCheck(
actions: Seq[Action],
op: DeltaOperations.Operation): Unit
performCdcColumnMappingCheck...FIXME
runPostCommitHooks¶
runPostCommitHooks(
version: Long,
committedActions: Seq[Action]): Unit
runPostCommitHooks simply runs every post-commit hook registered (in the postCommitHooks internal registry).
runPostCommitHooks clears the active transaction (making all follow-up operations non-transactional).
Note
Hooks may create new transactions.
Handling Non-Fatal Exceptions¶
For non-fatal exceptions, runPostCommitHooks prints out the following ERROR message to the logs, records the delta event, and requests the post-commit hook to handle the error.
Error when executing post-commit hook [name] for commit [version]
AssertionError¶
runPostCommitHooks throws an AssertionError when committed flag is disabled:
Can't call post commit hooks before committing
Next Possible Commit Version¶
getNextAttemptVersion(
previousAttemptVersion: Long): Long
getNextAttemptVersion requests the DeltaLog to update (and give the latest state snapshot of the delta table).
In the end, getNextAttemptVersion requests the Snapshot for the version and increments it.
Note
The input previousAttemptVersion argument is not used.
Operation Metrics¶
getOperationMetrics(
op: Operation): Option[Map[String, String]]
getOperationMetrics gives the metrics of the given Operation when the spark.databricks.delta.history.metricsEnabled configuration property is enabled. Otherwise, getOperationMetrics gives None.
CommitInfo¶
OptimisticTransactionImpl creates a CommitInfo when requested to commit with spark.databricks.delta.commitInfo.enabled configuration enabled.
OptimisticTransactionImpl uses the CommitInfo to recordDeltaEvent (as a CommitStats).
Attempting Commit¶
doCommit(
attemptVersion: Long,
currentTransactionInfo: CurrentTransactionInfo,
attemptNumber: Int,
isolationLevel: IsolationLevel): Boolean
doCommit returns whether or not this commit (attempt) should trigger checkpointing.
doCommit is used when:
OptimisticTransactionImplis requested to doCommitRetryIteratively
doCommit requests the given CurrentTransactionInfo for the final actions to commit (Actions).
doCommit prints out the following DEBUG message to the logs:
Attempting to commit version [attemptVersion] with [n] actions with [isolationLevel] isolation level
Writing Out¶
doCommit requests the DeltaLog for the LogStore to write out the actions to a delta file in the log directory with the attemptVersion version, e.g.
00000000000000000001.json
doCommit writes the actions out in JSON format.
Note
LogStores must throw a java.nio.file.FileAlreadyExistsException exception if the delta file already exists. Any FileAlreadyExistsExceptions are caught by doCommit itself to checkAndRetry.
lastCommitVersionInSession¶
doCommit sets the spark.databricks.delta.lastCommitVersionInSession configuration property to the given attemptVersion.
Post-Commit Snapshot¶
doCommit requests the DeltaLog to update.
Needs Checkpointing¶
doCommit determines whether or not this commit should trigger checkpointing based on the committed version (attemptVersion).
A commit triggers checkpointing when the following all hold:
- The committed version is any version greater than
0 - The committed version is a multiple of delta.checkpointInterval table property
CommitStats¶
doCommit records a new CommitStats event.
writeCommitFile¶
writeCommitFile(
attemptVersion: Long,
jsonActions: Iterator[String],
currentTransactionInfo: CurrentTransactionInfo): Option[VersionChecksum]
writeCommitFile...FIXME
Retrying Commit¶
checkAndRetry(
checkVersion: Long,
actions: Seq[Action],
attemptNumber: Int): Long
checkAndRetry...FIXME
checkAndRetry is used when OptimisticTransactionImpl is requested to commit (and attempts a commit that failed with an FileAlreadyExistsException).
Verifying New Metadata¶
verifyNewMetadata(
metadata: Metadata): Unit
verifyNewMetadata validates the given Metadata (and throws an exception if incorrect).
verifyNewMetadata is used when:
OptimisticTransactionImplis requested to prepareCommit and updateMetadata
verifyNewMetadata asserts that there are no column duplicates in the schema (of the given Metadata). verifyNewMetadata throws a DeltaAnalysisException if there are duplicates.
verifyNewMetadata branches off based on the DeltaColumnMappingMode (of the given Metadata):
-
In NoMapping mode,
verifyNewMetadatachecks the data schema and checks the partition columns (of the given Metadata).In case of
AnalysisExceptionand spark.databricks.delta.partitionColumnValidity.enabled configuration property enabled,verifyNewMetadatathrows aDeltaAnalysisException. -
For the other DeltaColumnMappingModes,
verifyNewMetadatacheckColumnIdAndPhysicalNameAssignments of the schema.
verifyNewMetadata validates generated columns if there are any (in the schema).
With spark.databricks.delta.schema.typeCheck.enabled configuration property enabled, verifyNewMetadata...FIXME
In the end, verifyNewMetadata checks the protocol requirements and, in case the protocol has been updated, records it in the newProtocol registry.
newProtocol¶
newProtocol: Option[Protocol]
OptimisticTransactionImpl defines newProtocol registry for a new Protocol.
newProtocol is undefined (None) by default.
newProtocol is defined when:
newProtocol is used for the protocol and to prepareCommit.
withGlobalConfigDefaults¶
withGlobalConfigDefaults(
metadata: Metadata): Metadata
withGlobalConfigDefaults...FIXME
withGlobalConfigDefaults is used when:
OptimisticTransactionImplis requested to updateMetadata and updateMetadataForNewTable
Looking Up Transaction Version (by Streaming Query ID)¶
txnVersion(
id: String): Long
txnVersion simply registers (adds) the given ID in the readTxn internal registry.
In the end, txnVersion requests the Snapshot for the transaction version for the given ID or -1.
txnVersion is used when:
DeltaSinkis requested to add a streaming micro-batch
User-Defined Metadata¶
getUserMetadata(
op: Operation): Option[String]
getUserMetadata returns the Operation.md#userMetadata[userMetadata] of the given Operation.md[] (if defined) or the value of DeltaSQLConf.md#DELTA_USER_METADATA[spark.databricks.delta.commitInfo.userMetadata] configuration property.
getUserMetadata is used when:
OptimisticTransactionImplis requested to commit (and spark.databricks.delta.commitInfo.enabled configuration property is enabled)- ConvertToDeltaCommand is executed (and in turn requests
DeltaCommandto commitLarge)
Post-Commit Hooks¶
postCommitHooks: ArrayBuffer[PostCommitHook]
OptimisticTransactionImpl uses postCommitHooks internal registry to manage PostCommitHooks to be executed right after a successful commit.
PostCommitHooks can be registered using registerPostCommitHook.
Blind-Append Transaction¶
var isBlindAppend: Boolean = false
OptimisticTransactionImpl defines isBlindAppend flag to mark this transaction as blind-append.
isBlindAppend is disabled (false) by default.
isBlindAppend can be enabled (true) only when this OptimisticTransactionImpl is requested to commitImpl and the following all hold:
- There are AddFiles only in this transaction
- This transaction does not depend on files (i.e., the readPredicates and the readFiles are all empty)
commitImpl¶
commitImpl(
actions: Seq[Action],
op: DeltaOperations.Operation,
canSkipEmptyCommits: Boolean,
tags: Map[String, String]): Option[Long]
commitImpl...FIXME
commitImpl is used when:
OptimisticTransactionImplis requested to commit, commitIfNeeded
commitIfNeeded¶
commitIfNeeded(
actions: Seq[Action],
op: DeltaOperations.Operation,
tags: Map[String, String] = Map.empty): Unit
Procedure
commitIfNeeded is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
commitIfNeeded commitImpl (with canSkipEmptyCommits flag enabled).
commitIfNeeded is used when:
- Delete command is executed
- Merge command is executed (and commitAndRecordStats)
- Update command is executed (and performUpdate)
- WriteIntoDelta command is executed
Internal Registries¶
newMetadata¶
newMetadata: Option[Metadata]
OptimisticTransactionImpl uses the newMetadata internal registry for a new Metadata that should be committed with this transaction.
newMetadata is initially undefined (None). It can be updated only once and before the transaction writes out any files.
newMetadata is used when prepareCommit and doCommit (for statistics).
newMetadata is available using metadata method.
readFiles¶
readFiles: HashSet[AddFile]
OptimisticTransactionImpl uses readFiles registry to track AddFiles that have been seen (scanned) by this transaction (when requested to filterFiles).
Used to determine isBlindAppend and checkForConflicts (and fail if the files have been deleted that the txn read).
readPredicates¶
readPredicates: ArrayBuffer[Expression]
readPredicates holds predicate expressions for partitions the transaction is modifying.
readPredicates is added a new predicate expression when filterFiles and readWholeTable.
readPredicates is used when checkAndRetry.
Internal Properties¶
committed¶
Controls whether the transaction has been committed or not (and prevents prepareCommit from being executed again)
Default: false
Enabled in postCommit
readTxn¶
Streaming query IDs that have been seen by this transaction
A new queryId is added when OptimisticTransactionImpl is requested for txnVersion
Used when OptimisticTransactionImpl is requested to checkAndRetry (to fail with a ConcurrentTransactionException for idempotent transactions that have conflicted)
snapshotMetadata¶
readWholeTable¶
readWholeTable(): Unit
readWholeTable simply adds True literal to the readPredicates internal registry.
readWholeTable is used when:
DeltaSinkis requested to add a streaming micro-batch (and the batch reads the same Delta table as this sink is going to write to)
updateMetadataForNewTable¶
updateMetadataForNewTable(
metadata: Metadata): Unit
updateMetadataForNewTable...FIXME
updateMetadataForNewTable is used when:
- ConvertToDeltaCommand and CreateDeltaTableCommand are executed
Metadata¶
metadata: Metadata
metadata is part of the TransactionalWrite abstraction.
metadata is either the newMetadata (if defined) or the snapshotMetadata.
Updating Metadata¶
updateMetadata(
_metadata: Metadata): Unit
updateMetadata asserts the following:
- The current transaction has not written data out yet (and the hasWritten flag is still disabled since it is not allowed to update the metadata in a transaction that has already written data)
- The metadata has not been changed already (and the newMetadata has not been assigned yet since it is not allowed to change the metadata more than once in a transaction)
In the end, updateMetadata updateMetadataInternal.
updateMetadata is used when:
OptimisticTransactionImplis requested to updateMetadataForNewTable- AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, AlterTableAddColumnsDeltaCommand, AlterTableChangeColumnDeltaCommand, AlterTableReplaceColumnsDeltaCommand are executed
- RestoreTableCommand is executed
ImplicitMetadataOperationis requested to updateMetadata
updateMetadataInternal¶
updateMetadataInternal(
_metadata: Metadata): Unit
updateMetadataInternal...FIXME
Files To Scan Matching Given Predicates¶
filterFiles(): Seq[AddFile] // (1)
filterFiles(
filters: Seq[Expression]): Seq[AddFile]
- No filters = all files
filterFiles gives the files to scan for the given predicates (filter expressions).
Internally, filterFiles requests the Snapshot for the filesForScan (for no projection attributes and the given filters).
filterFiles finds the partition predicates among the given filters (and the partition columns of the Metadata).
filterFiles registers (adds) the partition predicates (in the readPredicates internal registry) and the files to scan (in the readFiles internal registry).
filterFiles is used when:
DeltaSinkis requested to add a streaming micro-batch (withCompleteoutput mode)- DeleteCommand, MergeIntoCommand and UpdateCommand, WriteIntoDelta are executed
- CreateDeltaTableCommand is executed
lockCommitIfEnabled¶
lockCommitIfEnabled[T](
body: => T): T
lockCommitIfEnabled executes the body with a lock on a delta table when isCommitLockEnabled. Otherwise, lockCommitIfEnabled does not acquire a lock.
lockCommitIfEnabled is used when:
OptimisticTransactionImplis requested to doCommitRetryIteratively
isCommitLockEnabled¶
isCommitLockEnabled: Boolean
isCommitLockEnabled is the value of spark.databricks.delta.commitLock.enabled configuration property (if defined) or isPartialWriteVisible (requesting the LogStore from the DeltaLog).
Note
isCommitLockEnabled is true by default given the following:
- spark.databricks.delta.commitLock.enabled configuration property is undefined by default
- isPartialWriteVisible is
trueby default
Registering Post-Commit Hook¶
registerPostCommitHook(
hook: PostCommitHook): Unit
Procedure
registerPostCommitHook is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
registerPostCommitHook registers (adds) the given PostCommitHook to the postCommitHooks internal registry.
registerPostCommitHook is used when:
OptimisticTransactionImplis requested to commitImplTransactionalWriteis requested to write data out
setNewProtocolWithFeaturesEnabledByMetadata¶
setNewProtocolWithFeaturesEnabledByMetadata(
metadata: Metadata): Unit
setNewProtocolWithFeaturesEnabledByMetadata upgradeProtocolFromMetadataForExistingTable (with the given Metadata and the Protocol) and makes it this transaction's protocol.
setNewProtocolWithFeaturesEnabledByMetadata is used when:
OptimisticTransactionImplis requested to updateMetadataInternal and prepareCommit
Committing Large (Data Files)¶
commitLarge(
spark: SparkSession,
actions: Iterator[Action],
op: DeltaOperations.Operation,
context: Map[String, String],
metrics: Map[String, String]): (Long, Snapshot)
commitLarge...FIXME
commitLarge is used for the following commands:
- CONVERT (and performConvert)
- CreateDeltaTableCommand (and handleClone)
- RESTORE
Metadata Can Be Updated¶
canUpdateMetadata: Boolean
canUpdateMetadata holds when neither this transaction has already written data nor the metadata has been already changed.
In other words, canUpdateMetadata holds true when both of the following hold:
- hasWritten is
false - newMetadata is undefined
canUpdateMetadata is used when:
WriteIntoDeltais requested to write data out
Metadata Check for CDF Columns¶
performCdcMetadataCheck(): Unit
Procedure
performCdcMetadataCheck is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
Noop
performCdcMetadataCheck does nothing (noop) when executed with either the newMetadata registry empty or isCDCEnabledOnTable.
For the new metadata and Change Data Feed feature enabled on the table, performCdcMetadataCheck takes the column names of the newly-assigned metadata and compares them with the reserved column names of CDF (CDF-aware read schema).
If there are any reserved CDF column names found in the new metadata, performCdcMetadataCheck throws a DeltaIllegalStateException for the following:
- CDF was not enabled previously (in the initial metadata of the table snapshot) but reserved columns are present in the new schema
- CDF was enabled but reserved columns are present in the new metadata (i.e., in the data)
performCdcMetadataCheck is used when:
OptimisticTransactionImplis requested to commitImpl
Logging¶
OptimisticTransactionImpl is a Scala trait and logging is configured using the logger of the implementations.