Skip to content


OptimisticTransactionImpl is an extension of the TransactionalWrite abstraction for optimistic transactions that can modify a delta table (at a given version) and can be committed eventually.

In other words, OptimisticTransactionImpl is a set of actions as part of an Operation that changes the state of a delta table transactionally.



clock: Clock


deltaLog: DeltaLog

DeltaLog (of a delta table) this transaction commits changes to

deltaLog is part of the TransactionalWrite abstraction and seems to change it to val (from def).


snapshot: Snapshot

Snapshot (of the delta table) this transaction commits changes to

snapshot is part of the TransactionalWrite contract and seems to change it to val (from def).


Table Version at Reading Time

readVersion: Long

readVersion requests the Snapshot for the version.

readVersion is used when:

Transactional Commit

  actions: Seq[Action],
  op: DeltaOperations.Operation): Long

commit attempts to commit the given Actions (as part of the Operation) and gives the commit version.


commit is used when:


performCdcMetadataCheck(): Unit


Preparing Commit

commit then prepares a commit (that gives the final actions to commit that may be different from the given actions).

Isolation Level

commit determines the isolation level based on FileActions (in the given actions) and their dataChange flag.

With all actions with dataChange flag disabled (false), commit assumes no data changed and chooses SnapshotIsolation else Serializable.

Blind Append

commit is considered blind append when the following all hold:

  1. There are only AddFiles among FileActions in the actions (onlyAddFiles)
  2. It does not depend on files, i.e. the readPredicates and readFiles are empty (dependsOnFiles)



Registering Post-Commit Hook

commit registers the GenerateSymlinkManifest post-commit hook when there is a FileAction among the actions and the compatibility.symlinkFormatManifest.enabled table property is enabled.


commit doCommitRetryIteratively.

commit prints out the following INFO message to the logs:

Committed delta #[commitVersion] to [logPath]

Performing Post-Commit Operations

commit postCommit (with the version committed and the needsCheckpoint flag).

Executing Post-Commit Hooks

In the end, commit runs post-commit hooks and returns the version of the successful commit.


  attemptVersion: Long,
  currentTransactionInfo: CurrentTransactionInfo,
  isolationLevel: IsolationLevel): (Long, CurrentTransactionInfo, Boolean)

doCommitRetryIteratively acquires a lock on the delta table if enabled for the commit.

doCommitRetryIteratively uses attemptNumber internal counter to track the number of attempts. In case of a FileAlreadyExistsException, doCommitRetryIteratively increments the attemptNumber and tries over.

In the end, doCommitRetryIteratively returns a tuple with the following:

  1. Commit version (from the given attemptVersion inclusive up to
  2. CurrentTransactionInfo
  3. Whether the commit needs checkpoint or not (needsCheckpoint)

Firstly, doCommitRetryIteratively does the first attempt at commit. If successful, the commit is done.

If there is a retry, doCommitRetryIteratively checkForConflicts followed by another attempt at commit.

If the number of commit attempts (attemptNumber) is above the configuration property, doCommitRetryIteratively throws a DeltaIllegalStateException:

This commit has failed as it has been tried <numAttempts> times but did not succeed.
This can be caused by the Delta table being committed continuously by many concurrent commits.

Commit started at version: [attemptNumber]
Commit failed at version: [attemptVersion]
Number of actions attempted to commit: [numActions]
Total time spent attempting this commit: [timeSpent] ms

Checking Logical Conflicts with Concurrent Updates

  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int,
  commitIsolationLevel: IsolationLevel): Long

checkForConflicts checks for logical conflicts (of the given actions) with concurrent updates (actions of the commits since the transaction has started).

checkForConflicts gives the next possible commit version unless the following happened between the time of read (checkVersion) and the time of this commit attempt:

  1. Client is up to date with the table protocol for reading and writing (and hence allowed to access the table)
  2. Protocol version has changed
  3. Metadata has changed
  4. AddFiles have been added that the txn should have read based on the given IsolationLevel (Concurrent Append)
  5. AddFiles that the txn read have been deleted (Concurrent Delete)
  6. Files have been deleted by the txn and since the time of read (Concurrent Delete)
  7. Idempotent transactions have conflicted (Multiple Streaming Queries with the same checkpoint location)

checkForConflicts takes the next possible commit version.

For every commit since the time of read (checkVersion) and this commit attempt, checkForConflicts does the following:


  • Prints out the following INFO message to the logs:

    Completed checking for conflicts Version: [version] Attempt: [attemptNumber] Time: [totalCheckAndRetryTime] ms

In the end, checkForConflicts prints out the following INFO message to the logs:

No logical conflicts with deltas [[checkVersion], [nextAttemptVersion]), retrying.


  partitionValues: Map[String, String]): String



  commitVersion: Long,
  needsCheckpoint: Boolean): Unit

postCommit turns the committed flag on.

With the given needsCheckpoint enabled (that comes indirectly from doCommit), postCommit requests the DeltaLog for the Snapshot at the given commitVersion followed by checkpointing.


  actions: Seq[Action],
  op: DeltaOperations.Operation): Seq[Action]

prepareCommit adds the newMetadata action (if available) to the given actions.

prepareCommit verifyNewMetadata if there was one.


prepareCommit requests the DeltaLog to protocolWrite.


Multiple Metadata Changes Not Allowed

prepareCommit throws an AssertionError when there are multiple metadata changes in the transaction (by means of Metadata actions):

Cannot change the metadata more than once in a transaction.

Committing Transaction Allowed Once Only

prepareCommit throws an AssertionError when the committed internal flag is enabled:

Transaction already committed.


  actions: Seq[Action],
  op: DeltaOperations.Operation): Unit


Registering Post-Commit Hook

  hook: PostCommitHook): Unit

registerPostCommitHook registers (adds) the given PostCommitHook to the postCommitHooks internal registry.


  version: Long,
  committedActions: Seq[Action]): Unit

runPostCommitHooks simply runs every post-commit hook registered (in the postCommitHooks internal registry).

runPostCommitHooks clears the active transaction (making all follow-up operations non-transactional).


Hooks may create new transactions.

Handling Non-Fatal Exceptions

For non-fatal exceptions, runPostCommitHooks prints out the following ERROR message to the logs, records the delta event, and requests the post-commit hook to handle the error.

Error when executing post-commit hook [name] for commit [version]


runPostCommitHooks throws an AssertionError when committed flag is disabled:

Can't call post commit hooks before committing

Next Possible Commit Version

  previousAttemptVersion: Long): Long

getNextAttemptVersion requests the DeltaLog to update (and give the latest state snapshot of the delta table).

In the end, getNextAttemptVersion requests the Snapshot for the version and increments it.


The input previousAttemptVersion argument is not used.

Operation Metrics

  op: Operation): Option[Map[String, String]]

getOperationMetrics gives the metrics of the given Operation when the configuration property is enabled. Otherwise, getOperationMetrics gives None.


OptimisticTransactionImpl creates a CommitInfo when requested to commit with configuration enabled.

OptimisticTransactionImpl uses the CommitInfo to recordDeltaEvent (as a CommitStats).

Attempting Commit

  attemptVersion: Long,
  currentTransactionInfo: CurrentTransactionInfo,
  attemptNumber: Int,
  isolationLevel: IsolationLevel): Boolean

doCommit returns whether or not this commit (attempt) should trigger checkpointing.

doCommit is used when:

doCommit requests the given CurrentTransactionInfo for the final actions to commit (Actions).

doCommit prints out the following DEBUG message to the logs:

Attempting to commit version [attemptVersion] with [n] actions with [isolationLevel] isolation level

Writing Out

doCommit requests the DeltaLog for the LogStore to write out the actions to a delta file in the log directory with the attemptVersion version, e.g.


doCommit writes the actions out in JSON format.


LogStores must throw a java.nio.file.FileAlreadyExistsException exception if the delta file already exists. Any FileAlreadyExistsExceptions are caught by doCommit itself to checkAndRetry.


doCommit sets the configuration property to the given attemptVersion.

Post-Commit Snapshot

doCommit requests the DeltaLog to update.

Needs Checkpointing

doCommit determines whether or not this commit should trigger checkpointing based on the committed version (attemptVersion).

A commit triggers checkpointing when the following all hold:

  1. The committed version is any version greater than 0
  2. The committed version is a multiple of delta.checkpointInterval table property


doCommit records a new CommitStats event.

Retrying Commit

  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int): Long


checkAndRetry is used when OptimisticTransactionImpl is requested to commit (and attempts a commit that failed with an FileAlreadyExistsException).

Verifying New Metadata

  metadata: Metadata): Unit

verifyNewMetadata validates the given Metadata (and throws an exception if incorrect).

verifyNewMetadata is used when:

verifyNewMetadata asserts that there are no column duplicates in the schema (of the given Metadata). verifyNewMetadata throws a DeltaAnalysisException if there are duplicates.

verifyNewMetadata branches off based on the DeltaColumnMappingMode (of the given Metadata):

verifyNewMetadata validates generated columns if there are any (in the schema).

With configuration property enabled, verifyNewMetadata...FIXME

In the end, verifyNewMetadata checks the protocol requirements and, in case the protocol has been updated, records it in the newProtocol registry.


newProtocol: Option[Protocol]

OptimisticTransactionImpl defines newProtocol registry for a new Protocol.

newProtocol is undefined (None) by default.

newProtocol is defined when:

newProtocol is used for the protocol and to prepareCommit.


  metadata: Metadata): Metadata


withGlobalConfigDefaults is used when:

Looking Up Transaction Version (by Streaming Query ID)

  id: String): Long

txnVersion simply registers (adds) the given ID in the readTxn internal registry.

In the end, txnVersion requests the Snapshot for the transaction version for the given ID or -1.

txnVersion is used when:

User-Defined Metadata

  op: Operation): Option[String]

getUserMetadata returns the[userMetadata] of the given[] (if defined) or the value of[] configuration property.

getUserMetadata is used when:

Internal Registries

Post-Commit Hooks

postCommitHooks: ArrayBuffer[PostCommitHook]

OptimisticTransactionImpl manages PostCommitHooks that will be executed right after a commit is successful.

Post-commit hooks can be registered, but only the GenerateSymlinkManifest post-commit hook is supported.


newMetadata: Option[Metadata]

OptimisticTransactionImpl uses the newMetadata internal registry for a new Metadata that should be committed with this transaction.

newMetadata is initially undefined (None). It can be updated only once and before the transaction writes out any files.

newMetadata is used when prepareCommit and doCommit (for statistics).

newMetadata is available using metadata method.


readFiles: HashSet[AddFile]

OptimisticTransactionImpl uses readFiles registry to track AddFiles that have been seen (scanned) by this transaction (when requested to filterFiles).

Used to determine isBlindAppend and checkForConflicts (and fail if the files have been deleted that the txn read).


readPredicates: ArrayBuffer[Expression]

readPredicates holds predicate expressions for partitions the transaction is modifying.

readPredicates is added a new predicate expression when filterFiles and readWholeTable.

readPredicates is used when checkAndRetry.

Internal Properties


Controls whether the transaction has been committed or not (and prevents prepareCommit from being executed again)

Default: false

Enabled in postCommit


Streaming query IDs that have been seen by this transaction

A new queryId is added when OptimisticTransactionImpl is requested for txnVersion

Used when OptimisticTransactionImpl is requested to checkAndRetry (to fail with a ConcurrentTransactionException for idempotent transactions that have conflicted)


Metadata of the Snapshot


readWholeTable(): Unit

readWholeTable simply adds True literal to the readPredicates internal registry.

readWholeTable is used when:


  metadata: Metadata): Unit


updateMetadataForNewTable is used when:


metadata: Metadata

metadata is part of the TransactionalWrite abstraction.

metadata is either the newMetadata (if defined) or the snapshotMetadata.

Updating Metadata

  _metadata: Metadata): Unit

updateMetadata asserts the following:

  • The current transaction has not written data out yet (and the hasWritten flag is still disabled since it is not allowed to update the metadata in a transaction that has already written data)
  • The metadata has not been changed already (and the newMetadata has not been assigned yet since it is not allowed to change the metadata more than once in a transaction)

In the end, updateMetadata updateMetadataInternal.

updateMetadata is used when:


  _metadata: Metadata): Unit


Files To Scan Matching Given Predicates

filterFiles(): Seq[AddFile] // (1)
  filters: Seq[Expression]): Seq[AddFile]
  1. No filters = all files

filterFiles gives the files to scan for the given predicates (filter expressions).

Internally, filterFiles requests the Snapshot for the filesForScan (for no projection attributes and the given filters).

filterFiles finds the partition predicates among the given filters (and the partition columns of the Metadata).

filterFiles registers (adds) the partition predicates (in the readPredicates internal registry) and the files to scan (in the readFiles internal registry).

filterFiles is used when:


  body: => T): T

lockCommitIfEnabled executes the body with a lock on a delta table when isCommitLockEnabled. Otherwise, lockCommitIfEnabled does not acquire a lock.

lockCommitIfEnabled is used when:


isCommitLockEnabled: Boolean

isCommitLockEnabled is the value of configuration property (if defined) or isPartialWriteVisible (requesting the LogStore from the DeltaLog).


isCommitLockEnabled is true by default given the following:

  1. configuration property is undefined by default
  2. isPartialWriteVisible is true by default


OptimisticTransactionImpl is a Scala trait and logging is configured using the logger of the implementations.