Skip to content


OptimisticTransactionImpl is an extension of the TransactionalWrite abstraction for optimistic transactions that can modify a delta table (at a given version) and can be committed eventually.

In other words, OptimisticTransactionImpl is a set of actions as part of an Operation that changes the state of a delta table transactionally.



clock: Clock


deltaLog: DeltaLog

DeltaLog (of a delta table) that this transaction is changing

deltaLog is part of the TransactionalWrite abstraction and seems to change it to val (from def).


snapshot: Snapshot

Snapshot (of the delta table) that this transaction is changing

snapshot is part of the TransactionalWrite contract and seems to change it to val (from def).


Table Version at Reading Time

readVersion: Long

readVersion requests the Snapshot for the version.

readVersion is used when:

Transactional Commit

  actions: Seq[Action],
  op: DeltaOperations.Operation): Long

commit attempts to commit the given Actions (as part of the Operation) and gives the commit version.


commit is used when:

Preparing Commit

commit firstly prepares a commit (that gives the final actions to commit that may be different from the given actions).

Isolation Level

commit determines the isolation level based on FileActions (in the given actions) and their dataChange flag.

With all actions with dataChange flag disabled (false), commit assumes no data changed and chooses SnapshotIsolation else Serializable.

Blind Append

commit is considered blind append when the following all hold:

  1. There are only AddFiles among FileActions in the actions (onlyAddFiles)
  2. It does not depend on files, i.e. the readPredicates and readFiles are empty (dependsOnFiles)



Registering Post-Commit Hook

commit registers the GenerateSymlinkManifest post-commit hook when there is a FileAction among the actions and the compatibility.symlinkFormatManifest.enabled table property is enabled.

Commit Version

commit doCommit with the next version, the actions, attempt number 0, and the select isolation level.

commit prints out the following INFO message to the logs:

Committed delta #[commitVersion] to [logPath]

Performing Post-Commit Operations

commit postCommit (with the version committed and the actions).

Executing Post-Commit Hooks

In the end, commit runs post-commit hooks and returns the version of the successful commit.


  attemptVersion: Long,
  actions: Seq[Action],
  isolationLevel: IsolationLevel): Long


Checking Logical Conflicts with Concurrent Updates

  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int,
  commitIsolationLevel: IsolationLevel): Long

checkForConflicts checks for logical conflicts (of the given actions) with concurrent updates (actions of the commits since the transaction has started).

checkForConflicts gives the next possible commit version unless the following happened between the time of read (checkVersion) and the time of this commit attempt:

  1. Client is up to date with the table protocol for reading and writing (and hence allowed to access the table)
  2. Protocol version has changed
  3. Metadata has changed
  4. AddFiles have been added that the txn should have read based on the given IsolationLevel (Concurrent Append)
  5. AddFiles that the txn read have been deleted (Concurrent Delete)
  6. Files have been deleted by the txn and since the time of read (Concurrent Delete)
  7. Idempotent transactions have conflicted (Multiple Streaming Queries with the same checkpoint location)

checkForConflicts takes the next possible commit version.

For every commit since the time of read (checkVersion) and this commit attempt, checkForConflicts does the following:


  • Prints out the following INFO message to the logs:

    Completed checking for conflicts Version: [version] Attempt: [attemptNumber] Time: [totalCheckAndRetryTime] ms

In the end, checkForConflicts prints out the following INFO message to the logs:

No logical conflicts with deltas [[checkVersion], [nextAttemptVersion]), retrying.


  partitionValues: Map[String, String]): String



  commitVersion: Long,
  commitActions: Seq[Action]): Unit

postCommit turns the committed flag on.

postCommit requests the DeltaLog to checkpoint when the given commitVersion is not 0 (first commit) and the checkpoint interval has been reached (based on the given commitVersion).


commitActions argument is not used.

postCommit prints out the following WARN message to the logs in case of IllegalStateException:

Failed to checkpoint table state.


  actions: Seq[Action],
  op: DeltaOperations.Operation): Seq[Action]

prepareCommit adds the newMetadata action (if available) to the given actions.

prepareCommit verifyNewMetadata if there was one.


prepareCommit requests the DeltaLog to protocolWrite.


Multiple Metadata Changes Not Allowed

prepareCommit throws an AssertionError when there are multiple metadata changes in the transaction (by means of Metadata actions):

Cannot change the metadata more than once in a transaction.

Committing Transaction Allowed Once Only

prepareCommit throws an AssertionError when the committed internal flag is enabled:

Transaction already committed.

Registering Post-Commit Hook

  hook: PostCommitHook): Unit

registerPostCommitHook registers (adds) the given PostCommitHook to the postCommitHooks internal registry.


  version: Long,
  committedActions: Seq[Action]): Unit

runPostCommitHooks simply runs every post-commit hook registered (in the postCommitHooks internal registry).

runPostCommitHooks clears the active transaction (making all follow-up operations non-transactional).


Hooks may create new transactions.

Handling Non-Fatal Exceptions

For non-fatal exceptions, runPostCommitHooks prints out the following ERROR message to the logs, records the delta event, and requests the post-commit hook to handle the error.

Error when executing post-commit hook [name] for commit [version]


runPostCommitHooks throws an AssertionError when committed flag is disabled:

Can't call post commit hooks before committing

Next Possible Commit Version

  previousAttemptVersion: Long): Long

getNextAttemptVersion requests the DeltaLog to update (and give the latest state snapshot of the delta table).

In the end, getNextAttemptVersion requests the Snapshot for the version and increments it.


The input previousAttemptVersion argument is not used.

Operation Metrics

  op: Operation): Option[Map[String, String]]

getOperationMetrics gives the metrics of the given Operation when the configuration property is enabled. Otherwise, getOperationMetrics gives None.


OptimisticTransactionImpl creates a CommitInfo when requested to commit with configuration enabled.

OptimisticTransactionImpl uses the CommitInfo to recordDeltaEvent (as a CommitStats).

Attempting Commit

  attemptVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int,
  isolationLevel: IsolationLevel): Long

doCommit returns the given attemptVersion as the commit version if successful or checkAndRetry.

doCommit is used when:

Internally, doCommit prints out the following DEBUG message to the logs:

Attempting to commit version [attemptVersion] with [n] actions with [isolationLevel] isolation level

Writing Out

doCommit requests the DeltaLog for the LogStore to write out the given actions to a delta file in the log directory with the attemptVersion version, e.g.


doCommit writes the actions out in JSON format.


LogStores must throw a java.nio.file.FileAlreadyExistsException exception if the delta file already exists. Any FileAlreadyExistsExceptions are caught by doCommit itself to checkAndRetry.

Post-Commit Snapshot

doCommit requests the DeltaLog to update.


doCommit throws an IllegalStateException when the version of the snapshot after update is smaller than the given attemptVersion version.

The committed version is [attemptVersion] but the current version is [version].


doCommit records a new CommitStats and returns the given attemptVersion as the commit version.


doCommit catches FileAlreadyExistsExceptions and checkAndRetry.

Retrying Commit

  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int): Long


checkAndRetry is used when OptimisticTransactionImpl is requested to commit (and attempts a commit that failed with an FileAlreadyExistsException).


  metadata: Metadata): Unit


verifyNewMetadata is used when:


  metadata: Metadata): Metadata


withGlobalConfigDefaults is used when:

Looking Up Transaction Version For Given (Streaming Query) ID

  id: String): Long

txnVersion simply registers (adds) the given ID in the readTxn internal registry.

In the end, txnVersion requests the Snapshot for the transaction version for the given ID or -1.

txnVersion is used when:

User-Defined Metadata

  op: Operation): Option[String]

getUserMetadata returns the[userMetadata] of the given[] (if defined) or the value of[] configuration property.

getUserMetadata is used when:

Internal Registries

Post-Commit Hooks

postCommitHooks: ArrayBuffer[PostCommitHook]

OptimisticTransactionImpl manages PostCommitHooks that will be executed right after a commit is successful.

Post-commit hooks can be registered, but only the GenerateSymlinkManifest post-commit hook is supported.


newMetadata: Option[Metadata]

OptimisticTransactionImpl uses the newMetadata internal registry for a new Metadata that should be committed with this transaction.

newMetadata is initially undefined (None). It can be updated only once and before the transaction writes out any files.

newMetadata is used when prepareCommit and doCommit (for statistics).

newMetadata is available using metadata method.


readFiles: HashSet[AddFile]

OptimisticTransactionImpl uses readFiles registry to track AddFiles that have been seen (scanned) by this transaction (when requested to filterFiles).

Used to determine isBlindAppend and checkForConflicts (and fail if the files have been deleted that the txn read).


readPredicates: ArrayBuffer[Expression]

readPredicates holds predicate expressions for partitions the transaction is modifying.

readPredicates is added a new predicate expression when filterFiles and readWholeTable.

readPredicates is used when checkAndRetry.

Internal Properties


Controls whether the transaction has been committed or not (and prevents prepareCommit from being executed again)

Default: false

Enabled in postCommit


Streaming query IDs that have been seen by this transaction

A new queryId is added when OptimisticTransactionImpl is requested for txnVersion

Used when OptimisticTransactionImpl is requested to checkAndRetry (to fail with a ConcurrentTransactionException for idempotent transactions that have conflicted)


Metadata of the Snapshot


readWholeTable(): Unit

readWholeTable simply adds True literal to the readPredicates internal registry.

readWholeTable is used when:


  metadata: Metadata): Unit


updateMetadataForNewTable is used when:


metadata: Metadata

metadata is part of the TransactionalWrite abstraction.

metadata is either the newMetadata (if defined) or the snapshotMetadata.

Updating Metadata

  metadata: Metadata): Unit

updateMetadata updates the newMetadata internal property based on the readVersion:

  • For -1, updateMetadata updates the configuration of the given metadata with a new metadata based on the SQLConf (of the active SparkSession), the configuration of the given metadata and a new Protocol

  • For other versions, updateMetadata leaves the given Metadata unchanged

updateMetadata is used when:


updateMetadata throws an AssertionError when the hasWritten flag is enabled:

Cannot update the metadata in a transaction that has already written data.


updateMetadata throws an AssertionError when the newMetadata is not empty:

Cannot change the metadata more than once in a transaction.

Files To Scan Matching Given Predicates

filterFiles(): Seq[AddFile] // (1)
  filters: Seq[Expression]): Seq[AddFile]
  1. No filters = all files

filterFiles gives the files to scan for the given predicates (filter expressions).

Internally, filterFiles requests the Snapshot for the filesForScan (for no projection attributes and the given filters).

filterFiles finds the partition predicates among the given filters (and the partition columns of the Metadata).

filterFiles registers (adds) the partition predicates (in the readPredicates internal registry) and the files to scan (in the readFiles internal registry).

filterFiles is used when:

Back to top