OptimisticTransactionImpl

OptimisticTransactionImpl is an extension of the TransactionalWrite contract for optimistic transactions that can be committed.

Table 1. OptimisticTransactionImpl Contract (Abstract Methods Only)
Method Description

deltaLog

deltaLog: DeltaLog

DeltaLog of a delta table

deltaLog is part of the TransactionalWrite contract and seems to change it to val (from def).

snapshot

snapshot: Snapshot

Snapshot of a delta table

snapshot is part of the TransactionalWrite contract and seems to change it to val (from def).
OptimisticTransaction is the only known OptimisticTransactionImpl in Delta Lake.

Post-Commit Hooks — postCommitHooks Internal Registry

postCommitHooks: ArrayBuffer[PostCommitHook]

OptimisticTransactionImpl manages post-commit hooks that will be executed after a commit is successful.

Post-commit hooks can be registered, but only the GenerateSymlinkManifest post-commit hook is supported (when…​FIXME).

newMetadata Internal Registry

newMetadata: Option[Metadata]

OptimisticTransactionImpl uses the newMetadata internal registry for a new Metadata that should be committed with this transaction.

newMetadata is initially undefined (None). It can be updated only once and before the transaction writes out any files.

newMetadata is used when prepareCommit (and doCommit for statistics).

newMetadata is available using metadata method.

metadata Method

metadata: Metadata
metadata is part of the TransactionalWrite Contract to…​FIXME.

metadata is either the newMetadata (if defined) or the snapshotMetadata.

readVersion Method

readVersion: Long

readVersion simply requests the Snapshot for the version.

readVersion is used when:

Updating Metadata — updateMetadata Method

updateMetadata(
  metadata: Metadata): Unit

updateMetadata updates the newMetadata internal property based on the readVersion:

  • For -1, updateMetadata updates the configuration of the given Metadata with a new metadata based on the SQLConf (of the active SparkSession), the configuration of the given Metadata and a new Protocol

  • For other versions, updateMetadata leaves the given Metadata unchanged

updateMetadata throws an AssertionError when the hasWritten flag is enabled (true):

Cannot update the metadata in a transaction that has already written data.

updateMetadata throws an AssertionError when the newMetadata is not empty:

Cannot change the metadata more than once in a transaction.

updateMetadata is used when:

Files To Scan Matching Given Predicates — filterFiles Method

filterFiles(): Seq[AddFile] (1)
filterFiles(
  filters: Seq[Expression]): Seq[AddFile]
1 Uses true literal to mean that all files match

filterFiles gives the files to scan based on the given predicates (filter expressions).

Internally, filterFiles requests the Snapshot for the filesForScan (for no projection attributes and the given filters).

filterFiles finds the partition predicates among the given filters (and the partition columns of the Metadata).

filterFiles registers (adds) the partition predicates (in the readPredicates internal registry) and the files to scan (in the readFiles internal registry).

filterFiles is used when:

readWholeTable Method

readWholeTable(): Unit

readWholeTable…​FIXME

readWholeTable is used when…​FIXME

Committing Transaction — commit Method

commit(
  actions: Seq[Action],
  op: DeltaOperations.Operation): Long

commit firstly prepares a commit (that gives the final actions to commit that may be different from the given actions).

commit determines the isolation level for this commit by checking whether any FileAction (in the given actions) has the dataChange flag on (true). With no data changed, commit uses SnapshotIsolation else Serializable.

commit…​FIXME

commit…​FIXME

commit registers the GenerateSymlinkManifest post-commit hook when there is a FileAction among the actions and the compatibility.symlinkFormatManifest.enabled table property (from the Metadata) is enabled (true).

compatibility.symlinkFormatManifest.enabled table property defaults to false.

commit doCommit with the next version, the actions, attempt number 0, and the select isolation level.

commit prints out the following INFO message to the logs:

Committed delta #[commitVersion] to [logPath]

commit postCommit (with the version committed and the actions).

In the end, commit runs post-commit hooks and returns the version of the successful commit.

commit is used when:

Preparing Commit — prepareCommit Method

prepareCommit(
  actions: Seq[Action],
  op: DeltaOperations.Operation): Seq[Action]

prepareCommit adds the newMetadata action (if available) to the given actions.

prepareCommit verifyNewMetadata if there was one.

prepareCommit…​FIXME

prepareCommit requests the DeltaLog to protocolWrite.

prepareCommit…​FIXME

prepareCommit throws an AssertionError when the number of metadata changes in the transaction (by means of Metadata actions) is above 1:

Cannot change the metadata more than once in a transaction.

prepareCommit throws an AssertionError when the committed internal flag is turned on (true):

Transaction already committed.
prepareCommit is used exclusively when OptimisticTransactionImpl is requested to commit (at the beginning).

Performing Post-Commit Operations — postCommit Method

postCommit(
  commitVersion: Long,
  commitActions: Seq[Action]): Unit

postCommit…​FIXME

postCommit is used exclusively when OptimisticTransactionImpl is requested to commit (at the end).

Registering Post-Commit Hook — registerPostCommitHook Method

registerPostCommitHook(
  hook: PostCommitHook): Unit

registerPostCommitHook registers (adds) the given PostCommitHook to the postCommitHooks internal registry.

registerPostCommitHook adds the hook only once.
registerPostCommitHook is used when OptimisticTransactionImpl is requested to commit (to register the GenerateSymlinkManifest post-commit hook).

Running Post-Commit Hooks — runPostCommitHooks Method

runPostCommitHooks(
  version: Long,
  committedActions: Seq[Action]): Unit

runPostCommitHooks simply runs every post-commit hook registered (in the postCommitHooks internal registry).

runPostCommitHooks clears the active transaction (making all follow-up operations non-transactional).

Hooks may create new transactions.

For any non-fatal exception, runPostCommitHooks prints out the following ERROR message to the logs, records the delta event, and requests the post-commit hook to handle the error.

Error when executing post-commit hook [name] for commit [version]

runPostCommitHooks throws an AssertionError when committed flag is turned off (false):

Can't call post commit hooks before committing
runPostCommitHooks is used when OptimisticTransactionImpl is requested to commit.

Attempting Commit — doCommit Internal Method

doCommit(
  attemptVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int): Long

doCommit returns the given attemptVersion as the commit version if successful or checkAndRetry.

Internally, doCommit prints out the following DEBUG message to the logs:

Attempting to commit version [attemptVersion] with [size] actions with [isolationLevel] isolation level

doCommit requests the LogStore (of the DeltaLog) to write out the given actions (serialized to JSON format) to a delta file (e.g. 00000000000000000001.json) in the log directory (of the DeltaLog) with the attemptVersion version.

LogStores must throw a java.nio.file.FileAlreadyExistsException exception if the delta file already exists. Any FileAlreadyExistsExceptions are caught by doCommit itself to checkAndRetry.

doCommit requests the DeltaLog to update.

doCommit throws an IllegalStateException if the version of the snapshot after update is smaller than the given attemptVersion version.

The committed version is [attemptVersion] but the current version is [version].

doCommit records a new CommitStats and returns the given attemptVersion as the commit version.

doCommit catches FileAlreadyExistsExceptions and checkAndRetry.

doCommit is used when OptimisticTransactionImpl is requested to commit (and checkAndRetry).

Retrying Commit — checkAndRetry Internal Method

checkAndRetry(
  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int): Long

checkAndRetry…​FIXME

checkAndRetry is used when OptimisticTransactionImpl is requested to commit (and attempts a commit that failed with an FileAlreadyExistsException).

verifyNewMetadata Method

verifyNewMetadata(
  metadata: Metadata): Unit

verifyNewMetadata…​FIXME

verifyNewMetadata is used when OptimisticTransactionImpl is requested to prepareCommit and updateMetadata.

Looking Up Transaction Version For Given (Streaming Query) ID — txnVersion Method

txnVersion(
  id: String): Long

txnVersion simply registers (adds) the given ID in the readTxn internal registry.

In the end, txnVersion requests the Snapshot for the transaction version for the given ID or assumes -1.

txnVersion is used when DeltaSink is requested to add a streaming micro-batch.

getNextAttemptVersion Internal Method

getNextAttemptVersion(
  previousAttemptVersion: Long): Long

getNextAttemptVersion…​FIXME

getNextAttemptVersion is used when OptimisticTransactionImpl is requested to checkAndRetry.

Internal Properties

Name Description

committed

Flag that controls whether the transaction is committed or not (and prevents prepareCommit from being executed again)

Default: false

Enabled (set to true) exclusively in postCommit

dependsOnFiles

Flag that…​FIXME

Default: false

Enabled (set to true) in filterFiles, readWholeTable

Used in commit and checkAndRetry

readFiles

readPredicates

readTxn

Streaming query IDs that have been seen by this transaction

A new queryId is added when OptimisticTransactionImpl is requested for txnVersion

Used when OptimisticTransactionImpl is requested to checkAndRetry (to fail with a ConcurrentTransactionException for idempotent transactions that have conflicted)

snapshotMetadata