Skip to content

OptimisticTransactionImpl

OptimisticTransactionImpl is an extension of the TransactionalWrite abstraction for optimistic transactions that can modify a delta table (at a given version) and can be committed eventually.

In other words, OptimisticTransactionImpl is a set of actions as part of an Operation that changes the state of a delta table transactionally.

Contract

Clock

clock: Clock

DeltaLog

deltaLog: DeltaLog

DeltaLog (of a delta table) this transaction commits changes to

deltaLog is part of the TransactionalWrite abstraction and seems to change it to val (from def).

Snapshot

snapshot: Snapshot

Snapshot (of the delta table) this transaction commits changes to

snapshot is part of the TransactionalWrite contract and seems to change it to val (from def).

Implementations

Table Version at Reading Time

readVersion: Long

readVersion requests the Snapshot for the version.

readVersion is used when:

Transactional Commit

commit(
  actions: Seq[Action],
  op: DeltaOperations.Operation): Long

commit attempts to commit the given Actions (as part of the Operation) and gives the commit version.

Usage

commit is used when:

Preparing Commit

commit then prepares a commit (that gives the final actions to commit that may be different from the given actions).

Isolation Level

commit determines the isolation level based on FileActions (in the given actions) and their dataChange flag.

With all actions with dataChange flag disabled (false), commit assumes no data changed and chooses SnapshotIsolation else Serializable.

Blind Append

commit is considered blind append when the following all hold:

  1. There are only AddFiles among FileActions in the actions (onlyAddFiles)
  2. It does not depend on files, i.e. the readPredicates and readFiles are empty (dependsOnFiles)

CommitInfo

commit...FIXME

Registering Post-Commit Hook

commit registers the GenerateSymlinkManifest post-commit hook when there is a FileAction among the actions and the compatibility.symlinkFormatManifest.enabled table property is enabled.

doCommitRetryIteratively

commit doCommitRetryIteratively.

commit prints out the following INFO message to the logs:

Committed delta #[commitVersion] to [logPath]

Performing Post-Commit Operations

commit postCommit (with the version committed and the needsCheckpoint flag).

Executing Post-Commit Hooks

In the end, commit runs post-commit hooks and returns the version of the successful commit.

doCommitRetryIteratively

doCommitRetryIteratively(
  attemptVersion: Long,
  currentTransactionInfo: CurrentTransactionInfo,
  isolationLevel: IsolationLevel): (Long, CurrentTransactionInfo, Boolean)

doCommitRetryIteratively acquires a lock on the delta table if enabled for the commit.

doCommitRetryIteratively uses attemptNumber internal counter to track the number of attempts. In case of a FileAlreadyExistsException, doCommitRetryIteratively increments the attemptNumber and tries over.

In the end, doCommitRetryIteratively returns a tuple with the following:

  1. Commit version (from the given attemptVersion inclusive up to spark.databricks.delta.maxCommitAttempts)
  2. CurrentTransactionInfo
  3. Whether the commit needs checkpoint or not (needsCheckpoint)

Firstly, doCommitRetryIteratively does the first attempt at commit. If successful, the commit is done.

If there is a retry, doCommitRetryIteratively checkForConflicts followed by another attempt at commit.

If the number of commit attempts (attemptNumber) is above the spark.databricks.delta.maxCommitAttempts configuration property, doCommitRetryIteratively throws a DeltaIllegalStateException:

This commit has failed as it has been tried <numAttempts> times but did not succeed.
This can be caused by the Delta table being committed continuously by many concurrent commits.

Commit started at version: [attemptNumber]
Commit failed at version: [attemptVersion]
Number of actions attempted to commit: [numActions]
Total time spent attempting this commit: [timeSpent] ms

Checking Logical Conflicts with Concurrent Updates

checkForConflicts(
  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int,
  commitIsolationLevel: IsolationLevel): Long

checkForConflicts checks for logical conflicts (of the given actions) with concurrent updates (actions of the commits since the transaction has started).

checkForConflicts gives the next possible commit version unless the following happened between the time of read (checkVersion) and the time of this commit attempt:

  1. Client is up to date with the table protocol for reading and writing (and hence allowed to access the table)
  2. Protocol version has changed
  3. Metadata has changed
  4. AddFiles have been added that the txn should have read based on the given IsolationLevel (Concurrent Append)
  5. AddFiles that the txn read have been deleted (Concurrent Delete)
  6. Files have been deleted by the txn and since the time of read (Concurrent Delete)
  7. Idempotent transactions have conflicted (Multiple Streaming Queries with the same checkpoint location)

checkForConflicts takes the next possible commit version.

For every commit since the time of read (checkVersion) and this commit attempt, checkForConflicts does the following:

  • FIXME

  • Prints out the following INFO message to the logs:

    Completed checking for conflicts Version: [version] Attempt: [attemptNumber] Time: [totalCheckAndRetryTime] ms
    

In the end, checkForConflicts prints out the following INFO message to the logs:

No logical conflicts with deltas [[checkVersion], [nextAttemptVersion]), retrying.

getPrettyPartitionMessage

getPrettyPartitionMessage(
  partitionValues: Map[String, String]): String

getPrettyPartitionMessage...FIXME

postCommit

postCommit(
  commitVersion: Long,
  needsCheckpoint: Boolean): Unit

postCommit turns the committed flag on.

With the given needsCheckpoint enabled (that comes indirectly from doCommit), postCommit requests the DeltaLog for the Snapshot at the given commitVersion followed by checkpointing.

prepareCommit

prepareCommit(
  actions: Seq[Action],
  op: DeltaOperations.Operation): Seq[Action]

prepareCommit adds the newMetadata action (if available) to the given actions.

prepareCommit verifyNewMetadata if there was one.

prepareCommit...FIXME

prepareCommit requests the DeltaLog to protocolWrite.

prepareCommit...FIXME

Multiple Metadata Changes Not Allowed

prepareCommit throws an AssertionError when there are multiple metadata changes in the transaction (by means of Metadata actions):

Cannot change the metadata more than once in a transaction.

Committing Transaction Allowed Once Only

prepareCommit throws an AssertionError when the committed internal flag is enabled:

Transaction already committed.

performCdcColumnMappingCheck

performCdcColumnMappingCheck(
  actions: Seq[Action],
  op: DeltaOperations.Operation): Unit

performCdcColumnMappingCheck...FIXME

runPostCommitHooks

runPostCommitHooks(
  version: Long,
  committedActions: Seq[Action]): Unit

runPostCommitHooks simply runs every post-commit hook registered (in the postCommitHooks internal registry).

runPostCommitHooks clears the active transaction (making all follow-up operations non-transactional).

Note

Hooks may create new transactions.

Handling Non-Fatal Exceptions

For non-fatal exceptions, runPostCommitHooks prints out the following ERROR message to the logs, records the delta event, and requests the post-commit hook to handle the error.

Error when executing post-commit hook [name] for commit [version]

AssertionError

runPostCommitHooks throws an AssertionError when committed flag is disabled:

Can't call post commit hooks before committing

Next Possible Commit Version

getNextAttemptVersion(
  previousAttemptVersion: Long): Long

getNextAttemptVersion requests the DeltaLog to update (and give the latest state snapshot of the delta table).

In the end, getNextAttemptVersion requests the Snapshot for the version and increments it.

Note

The input previousAttemptVersion argument is not used.

Operation Metrics

getOperationMetrics(
  op: Operation): Option[Map[String, String]]

getOperationMetrics gives the metrics of the given Operation when the spark.databricks.delta.history.metricsEnabled configuration property is enabled. Otherwise, getOperationMetrics gives None.

CommitInfo

OptimisticTransactionImpl creates a CommitInfo when requested to commit with spark.databricks.delta.commitInfo.enabled configuration enabled.

OptimisticTransactionImpl uses the CommitInfo to recordDeltaEvent (as a CommitStats).

Attempting Commit

doCommit(
  attemptVersion: Long,
  currentTransactionInfo: CurrentTransactionInfo,
  attemptNumber: Int,
  isolationLevel: IsolationLevel): Boolean

doCommit returns whether or not this commit (attempt) should trigger checkpointing.

doCommit is used when:


doCommit requests the given CurrentTransactionInfo for the final actions to commit (Actions).

doCommit prints out the following DEBUG message to the logs:

Attempting to commit version [attemptVersion] with [n] actions with [isolationLevel] isolation level

Writing Out

doCommit requests the DeltaLog for the LogStore to write out the actions to a delta file in the log directory with the attemptVersion version, e.g.

00000000000000000001.json

doCommit writes the actions out in JSON format.

Note

LogStores must throw a java.nio.file.FileAlreadyExistsException exception if the delta file already exists. Any FileAlreadyExistsExceptions are caught by doCommit itself to checkAndRetry.

lastCommitVersionInSession

doCommit sets the spark.databricks.delta.lastCommitVersionInSession configuration property to the given attemptVersion.

Post-Commit Snapshot

doCommit requests the DeltaLog to update.

Needs Checkpointing

doCommit determines whether or not this commit should trigger checkpointing based on the committed version (attemptVersion).

A commit triggers checkpointing when the following all hold:

  1. The committed version is any version greater than 0
  2. The committed version is a multiple of delta.checkpointInterval table property

CommitStats

doCommit records a new CommitStats event.

writeCommitFile

writeCommitFile(
  attemptVersion: Long,
  jsonActions: Iterator[String],
  currentTransactionInfo: CurrentTransactionInfo): Option[VersionChecksum]

writeCommitFile...FIXME

Retrying Commit

checkAndRetry(
  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int): Long

checkAndRetry...FIXME

checkAndRetry is used when OptimisticTransactionImpl is requested to commit (and attempts a commit that failed with an FileAlreadyExistsException).

Verifying New Metadata

verifyNewMetadata(
  metadata: Metadata): Unit

verifyNewMetadata validates the given Metadata (and throws an exception if incorrect).

verifyNewMetadata is used when:


verifyNewMetadata asserts that there are no column duplicates in the schema (of the given Metadata). verifyNewMetadata throws a DeltaAnalysisException if there are duplicates.

verifyNewMetadata branches off based on the DeltaColumnMappingMode (of the given Metadata):

verifyNewMetadata validates generated columns if there are any (in the schema).

With spark.databricks.delta.schema.typeCheck.enabled configuration property enabled, verifyNewMetadata...FIXME

In the end, verifyNewMetadata checks the protocol requirements and, in case the protocol has been updated, records it in the newProtocol registry.

newProtocol

newProtocol: Option[Protocol]

OptimisticTransactionImpl defines newProtocol registry for a new Protocol.

newProtocol is undefined (None) by default.

newProtocol is defined when:

newProtocol is used for the protocol and to prepareCommit.

withGlobalConfigDefaults

withGlobalConfigDefaults(
  metadata: Metadata): Metadata

withGlobalConfigDefaults...FIXME

withGlobalConfigDefaults is used when:

Looking Up Transaction Version (by Streaming Query ID)

txnVersion(
  id: String): Long

txnVersion simply registers (adds) the given ID in the readTxn internal registry.

In the end, txnVersion requests the Snapshot for the transaction version for the given ID or -1.

txnVersion is used when:

User-Defined Metadata

getUserMetadata(
  op: Operation): Option[String]

getUserMetadata returns the Operation.md#userMetadata[userMetadata] of the given Operation.md[] (if defined) or the value of DeltaSQLConf.md#DELTA_USER_METADATA[spark.databricks.delta.commitInfo.userMetadata] configuration property.

getUserMetadata is used when:

Post-Commit Hooks

postCommitHooks: ArrayBuffer[PostCommitHook]

OptimisticTransactionImpl uses postCommitHooks internal registry to manage PostCommitHooks to be executed right after a successful commit.

PostCommitHooks can be registered using registerPostCommitHook.

Blind-Append Transaction

var isBlindAppend: Boolean = false

OptimisticTransactionImpl defines isBlindAppend flag to mark this transaction as blind-append.

isBlindAppend is disabled (false) by default.

isBlindAppend can be enabled (true) only when this OptimisticTransactionImpl is requested to commitImpl and the following all hold:

  1. There are AddFiles only in this transaction
  2. This transaction does not depend on files (i.e., the readPredicates and the readFiles are all empty)

commitImpl

commitImpl(
  actions: Seq[Action],
  op: DeltaOperations.Operation,
  canSkipEmptyCommits: Boolean,
  tags: Map[String, String]): Option[Long]

commitImpl...FIXME


commitImpl is used when:

commitIfNeeded

commitIfNeeded(
  actions: Seq[Action],
  op: DeltaOperations.Operation,
  tags: Map[String, String] = Map.empty): Unit
Procedure

commitIfNeeded is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

commitIfNeeded commitImpl (with canSkipEmptyCommits flag enabled).


commitIfNeeded is used when:

Internal Registries

newMetadata

newMetadata: Option[Metadata]

OptimisticTransactionImpl uses the newMetadata internal registry for a new Metadata that should be committed with this transaction.

newMetadata is initially undefined (None). It can be updated only once and before the transaction writes out any files.

newMetadata is used when prepareCommit and doCommit (for statistics).

newMetadata is available using metadata method.

readFiles

readFiles: HashSet[AddFile]

OptimisticTransactionImpl uses readFiles registry to track AddFiles that have been seen (scanned) by this transaction (when requested to filterFiles).

Used to determine isBlindAppend and checkForConflicts (and fail if the files have been deleted that the txn read).

readPredicates

readPredicates: ArrayBuffer[Expression]

readPredicates holds predicate expressions for partitions the transaction is modifying.

readPredicates is added a new predicate expression when filterFiles and readWholeTable.

readPredicates is used when checkAndRetry.

Internal Properties

committed

Controls whether the transaction has been committed or not (and prevents prepareCommit from being executed again)

Default: false

Enabled in postCommit

readTxn

Streaming query IDs that have been seen by this transaction

A new queryId is added when OptimisticTransactionImpl is requested for txnVersion

Used when OptimisticTransactionImpl is requested to checkAndRetry (to fail with a ConcurrentTransactionException for idempotent transactions that have conflicted)

snapshotMetadata

Metadata of the Snapshot

readWholeTable

readWholeTable(): Unit

readWholeTable simply adds True literal to the readPredicates internal registry.

readWholeTable is used when:

updateMetadataForNewTable

updateMetadataForNewTable(
  metadata: Metadata): Unit

updateMetadataForNewTable...FIXME

updateMetadataForNewTable is used when:

Metadata

metadata: Metadata

metadata is part of the TransactionalWrite abstraction.

metadata is either the newMetadata (if defined) or the snapshotMetadata.

Updating Metadata

updateMetadata(
  _metadata: Metadata): Unit

updateMetadata asserts the following:

  • The current transaction has not written data out yet (and the hasWritten flag is still disabled since it is not allowed to update the metadata in a transaction that has already written data)
  • The metadata has not been changed already (and the newMetadata has not been assigned yet since it is not allowed to change the metadata more than once in a transaction)

In the end, updateMetadata updateMetadataInternal.


updateMetadata is used when:

updateMetadataInternal

updateMetadataInternal(
  _metadata: Metadata): Unit

updateMetadataInternal...FIXME

Files To Scan Matching Given Predicates

filterFiles(): Seq[AddFile] // (1)
filterFiles(
  filters: Seq[Expression]): Seq[AddFile]
  1. No filters = all files

filterFiles gives the files to scan for the given predicates (filter expressions).

Internally, filterFiles requests the Snapshot for the filesForScan (for no projection attributes and the given filters).

filterFiles finds the partition predicates among the given filters (and the partition columns of the Metadata).

filterFiles registers (adds) the partition predicates (in the readPredicates internal registry) and the files to scan (in the readFiles internal registry).

filterFiles is used when:

lockCommitIfEnabled

lockCommitIfEnabled[T](
  body: => T): T

lockCommitIfEnabled executes the body with a lock on a delta table when isCommitLockEnabled. Otherwise, lockCommitIfEnabled does not acquire a lock.

lockCommitIfEnabled is used when:

isCommitLockEnabled

isCommitLockEnabled: Boolean

isCommitLockEnabled is the value of spark.databricks.delta.commitLock.enabled configuration property (if defined) or isPartialWriteVisible (requesting the LogStore from the DeltaLog).

Note

isCommitLockEnabled is true by default given the following:

  1. spark.databricks.delta.commitLock.enabled configuration property is undefined by default
  2. isPartialWriteVisible is true by default

Registering Post-Commit Hook

registerPostCommitHook(
  hook: PostCommitHook): Unit
Procedure

registerPostCommitHook is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

registerPostCommitHook registers (adds) the given PostCommitHook to the postCommitHooks internal registry.


registerPostCommitHook is used when:

setNewProtocolWithFeaturesEnabledByMetadata

setNewProtocolWithFeaturesEnabledByMetadata(
  metadata: Metadata): Unit

setNewProtocolWithFeaturesEnabledByMetadata upgradeProtocolFromMetadataForExistingTable (with the given Metadata and the Protocol) and makes it this transaction's protocol.


setNewProtocolWithFeaturesEnabledByMetadata is used when:

Committing Large (Data Files)

commitLarge(
  spark: SparkSession,
  actions: Iterator[Action],
  op: DeltaOperations.Operation,
  context: Map[String, String],
  metrics: Map[String, String]): (Long, Snapshot)

commitLarge...FIXME


commitLarge is used for the following commands:

Metadata Can Be Updated

canUpdateMetadata: Boolean

canUpdateMetadata holds when neither this transaction has already written data nor the metadata has been already changed.

In other words, canUpdateMetadata holds true when both of the following hold:

  1. hasWritten is false
  2. newMetadata is undefined

canUpdateMetadata is used when:

Metadata Check for CDF Columns

performCdcMetadataCheck(): Unit
Procedure

performCdcMetadataCheck is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

Noop

performCdcMetadataCheck does nothing (noop) when executed with either the newMetadata registry empty or isCDCEnabledOnTable.

For the new metadata and Change Data Feed feature enabled on the table, performCdcMetadataCheck takes the column names of the newly-assigned metadata and compares them with the reserved column names of CDF (CDF-aware read schema).

If there are any reserved CDF column names found in the new metadata, performCdcMetadataCheck throws a DeltaIllegalStateException for the following:

  • CDF was not enabled previously (in the initial metadata of the table snapshot) but reserved columns are present in the new schema
  • CDF was enabled but reserved columns are present in the new metadata (i.e., in the data)

performCdcMetadataCheck is used when:

  • OptimisticTransactionImpl is requested to commitImpl

Logging

OptimisticTransactionImpl is a Scala trait and logging is configured using the logger of the implementations.