OptimisticTransactionImpl¶
OptimisticTransactionImpl
is an extension of the TransactionalWrite abstraction for optimistic transactions that can modify a delta table (at a given version) and can be committed eventually.
In other words, OptimisticTransactionImpl
is a set of actions as part of an Operation that changes the state of a delta table transactionally.
Contract¶
Clock¶
clock: Clock
DeltaLog¶
deltaLog: DeltaLog
DeltaLog (of a delta table) this transaction commits changes to
deltaLog
is part of the TransactionalWrite abstraction and seems to change it to val
(from def
).
Snapshot¶
snapshot: Snapshot
Snapshot (of the delta table) this transaction commits changes to
snapshot
is part of the TransactionalWrite contract and seems to change it to val
(from def
).
Implementations¶
Table Version at Reading Time¶
readVersion: Long
readVersion
requests the Snapshot for the version.
readVersion
is used when:
OptimisticTransactionImpl
is requested to updateMetadata and commit- AlterDeltaTableCommand, ConvertToDeltaCommand, CreateDeltaTableCommand commands are executed
DeltaCommand
is requested to commitLargeWriteIntoDelta
is requested to writeImplicitMetadataOperation
is requested to updateMetadata
Transactional Commit¶
commit(
actions: Seq[Action],
op: DeltaOperations.Operation): Long
commit
attempts to commit the given Actions (as part of the Operation) and gives the commit version.
Usage¶
commit
 is used when:
- ALTER TABLE commands are executed
- ConvertToDeltaCommand is executed
- CreateDeltaTableCommand is executed
- DeleteCommand is executed
DeltaLog
is requested to upgrade the protocolDeltaSink
is requested to add a streaming micro-batch- MergeIntoCommand is executed
- OptimizeTableCommand is executed (and requests
OptimizeExecutor
to commitAndRetry) StatisticsCollection
is requested to recompute- UpdateCommand is executed
- WriteIntoDelta is executed
Preparing Commit¶
commit
then prepares a commit (that gives the final actions to commit that may be different from the given actions).
Isolation Level¶
commit
determines the isolation level based on FileActions (in the given actions) and their dataChange flag.
With all actions with dataChange flag disabled (false
), commit
assumes no data changed and chooses SnapshotIsolation else Serializable.
Blind Append¶
commit
is considered blind append when the following all hold:
- There are only AddFiles among FileActions in the actions (onlyAddFiles)
- It does not depend on files, i.e. the readPredicates and readFiles are empty (dependsOnFiles)
CommitInfo¶
commit
...FIXME
Registering Post-Commit Hook¶
commit
registers the GenerateSymlinkManifest post-commit hook when there is a FileAction among the actions and the compatibility.symlinkFormatManifest.enabled table property is enabled.
doCommitRetryIteratively¶
commit
doCommitRetryIteratively.
commit
prints out the following INFO message to the logs:
Committed delta #[commitVersion] to [logPath]
Performing Post-Commit Operations¶
commit
postCommit (with the version committed and the needsCheckpoint
flag).
Executing Post-Commit Hooks¶
In the end, commit runs post-commit hooks and returns the version of the successful commit.
doCommitRetryIteratively¶
doCommitRetryIteratively(
attemptVersion: Long,
currentTransactionInfo: CurrentTransactionInfo,
isolationLevel: IsolationLevel): (Long, CurrentTransactionInfo, Boolean)
doCommitRetryIteratively
acquires a lock on the delta table if enabled for the commit.
doCommitRetryIteratively
uses attemptNumber
internal counter to track the number of attempts. In case of a FileAlreadyExistsException
, doCommitRetryIteratively
increments the attemptNumber
and tries over.
In the end, doCommitRetryIteratively
returns a tuple with the following:
- Commit version (from the given
attemptVersion
inclusive up to spark.databricks.delta.maxCommitAttempts) CurrentTransactionInfo
- Whether the commit needs checkpoint or not (
needsCheckpoint
)
Firstly, doCommitRetryIteratively
does the first attempt at commit. If successful, the commit is done.
If there is a retry, doCommitRetryIteratively
checkForConflicts followed by another attempt at commit.
If the number of commit attempts (attemptNumber
) is above the spark.databricks.delta.maxCommitAttempts configuration property, doCommitRetryIteratively
throws a DeltaIllegalStateException:
This commit has failed as it has been tried <numAttempts> times but did not succeed.
This can be caused by the Delta table being committed continuously by many concurrent commits.
Commit started at version: [attemptNumber]
Commit failed at version: [attemptVersion]
Number of actions attempted to commit: [numActions]
Total time spent attempting this commit: [timeSpent] ms
Checking Logical Conflicts with Concurrent Updates¶
checkForConflicts(
checkVersion: Long,
actions: Seq[Action],
attemptNumber: Int,
commitIsolationLevel: IsolationLevel): Long
checkForConflicts
checks for logical conflicts (of the given actions
) with concurrent updates (actions of the commits since the transaction has started).
checkForConflicts
gives the next possible commit version unless the following happened between the time of read (checkVersion
) and the time of this commit attempt:
- Client is up to date with the table protocol for reading and writing (and hence allowed to access the table)
- Protocol version has changed
- Metadata has changed
- AddFiles have been added that the txn should have read based on the given IsolationLevel (Concurrent Append)
- AddFiles that the txn read have been deleted (Concurrent Delete)
- Files have been deleted by the txn and since the time of read (Concurrent Delete)
- Idempotent transactions have conflicted (Multiple Streaming Queries with the same checkpoint location)
checkForConflicts
takes the next possible commit version.
For every commit since the time of read (checkVersion
) and this commit attempt, checkForConflicts
does the following:
-
FIXME
-
Prints out the following INFO message to the logs:
Completed checking for conflicts Version: [version] Attempt: [attemptNumber] Time: [totalCheckAndRetryTime] ms
In the end, checkForConflicts
prints out the following INFO message to the logs:
No logical conflicts with deltas [[checkVersion], [nextAttemptVersion]), retrying.
getPrettyPartitionMessage¶
getPrettyPartitionMessage(
partitionValues: Map[String, String]): String
getPrettyPartitionMessage
...FIXME
postCommit¶
postCommit(
commitVersion: Long,
needsCheckpoint: Boolean): Unit
postCommit
turns the committed flag on.
With the given needsCheckpoint
enabled (that comes indirectly from doCommit), postCommit
requests the DeltaLog for the Snapshot at the given commitVersion
followed by checkpointing.
prepareCommit¶
prepareCommit(
actions: Seq[Action],
op: DeltaOperations.Operation): Seq[Action]
prepareCommit
adds the newMetadata action (if available) to the given actions.
prepareCommit
verifyNewMetadata if there was one.
prepareCommit
...FIXME
prepareCommit
requests the DeltaLog to protocolWrite.
prepareCommit
...FIXME
Multiple Metadata Changes Not Allowed¶
prepareCommit
throws an AssertionError
when there are multiple metadata changes in the transaction (by means of Metadata actions):
Cannot change the metadata more than once in a transaction.
Committing Transaction Allowed Once Only¶
prepareCommit throws an AssertionError
when the committed internal flag is enabled:
Transaction already committed.
performCdcColumnMappingCheck¶
performCdcColumnMappingCheck(
actions: Seq[Action],
op: DeltaOperations.Operation): Unit
performCdcColumnMappingCheck
...FIXME
runPostCommitHooks¶
runPostCommitHooks(
version: Long,
committedActions: Seq[Action]): Unit
runPostCommitHooks
simply runs every post-commit hook registered (in the postCommitHooks internal registry).
runPostCommitHooks
clears the active transaction (making all follow-up operations non-transactional).
Note
Hooks may create new transactions.
Handling Non-Fatal Exceptions¶
For non-fatal exceptions, runPostCommitHooks
prints out the following ERROR message to the logs, records the delta event, and requests the post-commit hook to handle the error.
Error when executing post-commit hook [name] for commit [version]
AssertionError¶
runPostCommitHooks
throws an AssertionError
when committed flag is disabled:
Can't call post commit hooks before committing
Next Possible Commit Version¶
getNextAttemptVersion(
previousAttemptVersion: Long): Long
getNextAttemptVersion
requests the DeltaLog to update (and give the latest state snapshot of the delta table).
In the end, getNextAttemptVersion
requests the Snapshot
for the version and increments it.
Note
The input previousAttemptVersion
argument is not used.
Operation Metrics¶
getOperationMetrics(
op: Operation): Option[Map[String, String]]
getOperationMetrics
gives the metrics of the given Operation when the spark.databricks.delta.history.metricsEnabled configuration property is enabled. Otherwise, getOperationMetrics
gives None
.
CommitInfo¶
OptimisticTransactionImpl
creates a CommitInfo when requested to commit with spark.databricks.delta.commitInfo.enabled configuration enabled.
OptimisticTransactionImpl
uses the CommitInfo
to recordDeltaEvent
(as a CommitStats
).
Attempting Commit¶
doCommit(
attemptVersion: Long,
currentTransactionInfo: CurrentTransactionInfo,
attemptNumber: Int,
isolationLevel: IsolationLevel): Boolean
doCommit
returns whether or not this commit (attempt) should trigger checkpointing.
doCommit
is used when:
OptimisticTransactionImpl
is requested to doCommitRetryIteratively
doCommit
requests the given CurrentTransactionInfo
for the final actions to commit (Actions).
doCommit
prints out the following DEBUG message to the logs:
Attempting to commit version [attemptVersion] with [n] actions with [isolationLevel] isolation level
Writing Out¶
doCommit
requests the DeltaLog for the LogStore to write out the actions to a delta file in the log directory with the attemptVersion
version, e.g.
00000000000000000001.json
doCommit
writes the actions out in JSON format.
Note
LogStores must throw a java.nio.file.FileAlreadyExistsException
exception if the delta file already exists. Any FileAlreadyExistsExceptions
are caught by doCommit itself to checkAndRetry.
lastCommitVersionInSession¶
doCommit
sets the spark.databricks.delta.lastCommitVersionInSession configuration property to the given attemptVersion
.
Post-Commit Snapshot¶
doCommit
requests the DeltaLog to update.
Needs Checkpointing¶
doCommit
determines whether or not this commit should trigger checkpointing based on the committed version (attemptVersion
).
A commit triggers checkpointing when the following all hold:
- The committed version is any version greater than
0
- The committed version is a multiple of delta.checkpointInterval table property
CommitStats¶
doCommit
records a new CommitStats
event.
writeCommitFile¶
writeCommitFile(
attemptVersion: Long,
jsonActions: Iterator[String],
currentTransactionInfo: CurrentTransactionInfo): Option[VersionChecksum]
writeCommitFile
...FIXME
Retrying Commit¶
checkAndRetry(
checkVersion: Long,
actions: Seq[Action],
attemptNumber: Int): Long
checkAndRetry
...FIXME
checkAndRetry
is used when OptimisticTransactionImpl is requested to commit (and attempts a commit that failed with an FileAlreadyExistsException
).
Verifying New Metadata¶
verifyNewMetadata(
metadata: Metadata): Unit
verifyNewMetadata
validates the given Metadata (and throws an exception if incorrect).
verifyNewMetadata
is used when:
OptimisticTransactionImpl
is requested to prepareCommit and updateMetadata
verifyNewMetadata
asserts that there are no column duplicates in the schema (of the given Metadata). verifyNewMetadata
throws a DeltaAnalysisException
if there are duplicates.
verifyNewMetadata
branches off based on the DeltaColumnMappingMode (of the given Metadata):
-
In NoMapping mode,
verifyNewMetadata
checks the data schema and checks the partition columns (of the given Metadata).In case of
AnalysisException
and spark.databricks.delta.partitionColumnValidity.enabled configuration property enabled,verifyNewMetadata
throws aDeltaAnalysisException
. -
For the other DeltaColumnMappingModes,
verifyNewMetadata
checkColumnIdAndPhysicalNameAssignments of the schema.
verifyNewMetadata
validates generated columns if there are any (in the schema).
With spark.databricks.delta.schema.typeCheck.enabled configuration property enabled, verifyNewMetadata
...FIXME
In the end, verifyNewMetadata
checks the protocol requirements and, in case the protocol has been updated, records it in the newProtocol registry.
newProtocol¶
newProtocol: Option[Protocol]
OptimisticTransactionImpl
defines newProtocol
registry for a new Protocol.
newProtocol
is undefined (None
) by default.
newProtocol
is defined when:
newProtocol
is used for the protocol and to prepareCommit.
withGlobalConfigDefaults¶
withGlobalConfigDefaults(
metadata: Metadata): Metadata
withGlobalConfigDefaults
...FIXME
withGlobalConfigDefaults
is used when:
OptimisticTransactionImpl
is requested to updateMetadata and updateMetadataForNewTable
Looking Up Transaction Version (by Streaming Query ID)¶
txnVersion(
id: String): Long
txnVersion
simply registers (adds) the given ID in the readTxn internal registry.
In the end, txnVersion
requests the Snapshot for the transaction version for the given ID or -1
.
txnVersion
is used when:
DeltaSink
is requested to add a streaming micro-batch
User-Defined Metadata¶
getUserMetadata(
op: Operation): Option[String]
getUserMetadata
returns the Operation.md#userMetadata[userMetadata] of the given Operation.md[] (if defined) or the value of DeltaSQLConf.md#DELTA_USER_METADATA[spark.databricks.delta.commitInfo.userMetadata] configuration property.
getUserMetadata
is used when:
OptimisticTransactionImpl
is requested to commit (and spark.databricks.delta.commitInfo.enabled configuration property is enabled)- ConvertToDeltaCommand is executed (and in turn requests
DeltaCommand
to commitLarge)
Post-Commit Hooks¶
postCommitHooks: ArrayBuffer[PostCommitHook]
OptimisticTransactionImpl
uses postCommitHooks
internal registry to manage PostCommitHooks to be executed right after a successful commit.
PostCommitHooks can be registered using registerPostCommitHook.
Blind-Append Transaction¶
var isBlindAppend: Boolean = false
OptimisticTransactionImpl
defines isBlindAppend
flag to mark this transaction as blind-append.
isBlindAppend
is disabled (false
) by default.
isBlindAppend
can be enabled (true
) only when this OptimisticTransactionImpl
is requested to commitImpl and the following all hold:
- There are AddFiles only in this transaction
- This transaction does not depend on files (i.e., the readPredicates and the readFiles are all empty)
commitImpl¶
commitImpl(
actions: Seq[Action],
op: DeltaOperations.Operation,
canSkipEmptyCommits: Boolean,
tags: Map[String, String]): Option[Long]
commitImpl
...FIXME
commitImpl
is used when:
OptimisticTransactionImpl
is requested to commit, commitIfNeeded
commitIfNeeded¶
commitIfNeeded(
actions: Seq[Action],
op: DeltaOperations.Operation,
tags: Map[String, String] = Map.empty): Unit
Procedure
commitIfNeeded
is a procedure (returns Unit
) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
commitIfNeeded
commitImpl (with canSkipEmptyCommits
flag enabled).
commitIfNeeded
is used when:
- Delete command is executed
- Merge command is executed (and commitAndRecordStats)
- Update command is executed (and performUpdate)
- WriteIntoDelta command is executed
Internal Registries¶
newMetadata¶
newMetadata: Option[Metadata]
OptimisticTransactionImpl
uses the newMetadata
internal registry for a new Metadata that should be committed with this transaction.
newMetadata
is initially undefined (None
). It can be updated only once and before the transaction writes out any files.
newMetadata
is used when prepareCommit and doCommit (for statistics).
newMetadata
is available using metadata method.
readFiles¶
readFiles: HashSet[AddFile]
OptimisticTransactionImpl
uses readFiles
registry to track AddFiles that have been seen (scanned) by this transaction (when requested to filterFiles).
Used to determine isBlindAppend and checkForConflicts (and fail if the files have been deleted that the txn read).
readPredicates¶
readPredicates: ArrayBuffer[Expression]
readPredicates
holds predicate expressions for partitions the transaction is modifying.
readPredicates
is added a new predicate expression when filterFiles and readWholeTable.
readPredicates
is used when checkAndRetry.
Internal Properties¶
committed¶
Controls whether the transaction has been committed or not (and prevents prepareCommit from being executed again)
Default: false
Enabled in postCommit
readTxn¶
Streaming query IDs that have been seen by this transaction
A new queryId is added when OptimisticTransactionImpl
is requested for txnVersion
Used when OptimisticTransactionImpl
is requested to checkAndRetry (to fail with a ConcurrentTransactionException
for idempotent transactions that have conflicted)
snapshotMetadata¶
readWholeTable¶
readWholeTable(): Unit
readWholeTable
simply adds True
literal to the readPredicates internal registry.
readWholeTable
is used when:
DeltaSink
is requested to add a streaming micro-batch (and the batch reads the same Delta table as this sink is going to write to)
updateMetadataForNewTable¶
updateMetadataForNewTable(
metadata: Metadata): Unit
updateMetadataForNewTable
...FIXME
updateMetadataForNewTable
is used when:
- ConvertToDeltaCommand and CreateDeltaTableCommand are executed
Metadata¶
metadata: Metadata
metadata
is part of the TransactionalWrite abstraction.
metadata
is either the newMetadata (if defined) or the snapshotMetadata.
Updating Metadata¶
updateMetadata(
_metadata: Metadata): Unit
updateMetadata
asserts the following:
- The current transaction has not written data out yet (and the hasWritten flag is still disabled since it is not allowed to update the metadata in a transaction that has already written data)
- The metadata has not been changed already (and the newMetadata has not been assigned yet since it is not allowed to change the metadata more than once in a transaction)
In the end, updateMetadata
updateMetadataInternal.
updateMetadata
is used when:
OptimisticTransactionImpl
is requested to updateMetadataForNewTable- AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, AlterTableAddColumnsDeltaCommand, AlterTableChangeColumnDeltaCommand, AlterTableReplaceColumnsDeltaCommand are executed
- RestoreTableCommand is executed
ImplicitMetadataOperation
is requested to updateMetadata
updateMetadataInternal¶
updateMetadataInternal(
_metadata: Metadata): Unit
updateMetadataInternal
...FIXME
Files To Scan Matching Given Predicates¶
filterFiles(): Seq[AddFile] // (1)
filterFiles(
filters: Seq[Expression]): Seq[AddFile]
- No filters = all files
filterFiles
gives the files to scan for the given predicates (filter expressions).
Internally, filterFiles
requests the Snapshot for the filesForScan (for no projection attributes and the given filters).
filterFiles
finds the partition predicates among the given filters (and the partition columns of the Metadata).
filterFiles
registers (adds) the partition predicates (in the readPredicates internal registry) and the files to scan (in the readFiles internal registry).
filterFiles
is used when:
DeltaSink
is requested to add a streaming micro-batch (withComplete
output mode)- DeleteCommand, MergeIntoCommand and UpdateCommand, WriteIntoDelta are executed
- CreateDeltaTableCommand is executed
lockCommitIfEnabled¶
lockCommitIfEnabled[T](
body: => T): T
lockCommitIfEnabled
executes the body
with a lock on a delta table when isCommitLockEnabled. Otherwise, lockCommitIfEnabled
does not acquire a lock.
lockCommitIfEnabled
is used when:
OptimisticTransactionImpl
is requested to doCommitRetryIteratively
isCommitLockEnabled¶
isCommitLockEnabled: Boolean
isCommitLockEnabled
is the value of spark.databricks.delta.commitLock.enabled configuration property (if defined) or isPartialWriteVisible (requesting the LogStore from the DeltaLog).
Note
isCommitLockEnabled
is true
by default given the following:
- spark.databricks.delta.commitLock.enabled configuration property is undefined by default
- isPartialWriteVisible is
true
by default
Registering Post-Commit Hook¶
registerPostCommitHook(
hook: PostCommitHook): Unit
Procedure
registerPostCommitHook
is a procedure (returns Unit
) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
registerPostCommitHook
registers (adds) the given PostCommitHook to the postCommitHooks internal registry.
registerPostCommitHook
is used when:
OptimisticTransactionImpl
is requested to commitImplTransactionalWrite
is requested to write data out
setNewProtocolWithFeaturesEnabledByMetadata¶
setNewProtocolWithFeaturesEnabledByMetadata(
metadata: Metadata): Unit
setNewProtocolWithFeaturesEnabledByMetadata
upgradeProtocolFromMetadataForExistingTable (with the given Metadata and the Protocol) and makes it this transaction's protocol.
setNewProtocolWithFeaturesEnabledByMetadata
is used when:
OptimisticTransactionImpl
is requested to updateMetadataInternal and prepareCommit
Committing Large (Data Files)¶
commitLarge(
spark: SparkSession,
actions: Iterator[Action],
op: DeltaOperations.Operation,
context: Map[String, String],
metrics: Map[String, String]): (Long, Snapshot)
commitLarge
...FIXME
commitLarge
is used for the following commands:
- CONVERT (and performConvert)
- CreateDeltaTableCommand (and handleClone)
- RESTORE
Metadata Can Be Updated¶
canUpdateMetadata: Boolean
canUpdateMetadata
holds when neither this transaction has already written data nor the metadata has been already changed.
In other words, canUpdateMetadata
holds true
when both of the following hold:
- hasWritten is
false
- newMetadata is undefined
canUpdateMetadata
is used when:
WriteIntoDelta
is requested to write data out
Metadata Check for CDF Columns¶
performCdcMetadataCheck(): Unit
Procedure
performCdcMetadataCheck
is a procedure (returns Unit
) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
Noop
performCdcMetadataCheck
does nothing (noop) when executed with either the newMetadata registry empty or isCDCEnabledOnTable.
For the new metadata and Change Data Feed feature enabled on the table, performCdcMetadataCheck
takes the column names of the newly-assigned metadata and compares them with the reserved column names of CDF (CDF-aware read schema).
If there are any reserved CDF column names found in the new metadata, performCdcMetadataCheck
throws a DeltaIllegalStateException
for the following:
- CDF was not enabled previously (in the initial metadata of the table snapshot) but reserved columns are present in the new schema
- CDF was enabled but reserved columns are present in the new metadata (i.e., in the data)
performCdcMetadataCheck
is used when:
OptimisticTransactionImpl
is requested to commitImpl
Logging¶
OptimisticTransactionImpl
is a Scala trait and logging is configured using the logger of the implementations.