Skip to content

OptimisticTransactionImpl

OptimisticTransactionImpl is an extension of the TransactionalWrite abstraction for optimistic transactions that can modify a delta table (at a given version) and can be committed eventually.

In other words, OptimisticTransactionImpl is a set of actions as part of an Operation that changes the state of a delta table transactionally.

Contract

Clock

clock: Clock

DeltaLog

deltaLog: DeltaLog

DeltaLog (of a delta table) that this transaction is changing

deltaLog is part of the TransactionalWrite abstraction and seems to change it to val (from def).

Snapshot

snapshot: Snapshot

Snapshot (of the delta table) that this transaction is changing

snapshot is part of the TransactionalWrite contract and seems to change it to val (from def).

Implementations

Table Version at Reading Time

readVersion: Long

readVersion requests the Snapshot for the version.

readVersion is used when:

Transactional Commit

commit(
  actions: Seq[Action],
  op: DeltaOperations.Operation): Long

commit attempts to commit the transaction (with the Actions and the Operation) and gives the commit version.

Usage

commit is used when:

Preparing Commit

commit firstly prepares a commit (that gives the final actions to commit that may be different from the given actions).

Isolation Level

commit determines the isolation level based on FileActions (in the given actions) and their dataChange flag.

With all actions with dataChange flag disabled (false), commit assumes no data changed and chooses SnapshotIsolation else Serializable.

Blind Append

commit is considered blind append when the following all hold:

  1. There are only AddFiles among FileActions in the actions (onlyAddFiles)
  2. It does not depend on files, i.e. the readPredicates and readFiles are empty (dependsOnFiles)

CommitInfo

commit...FIXME

Registering Post-Commit Hook

commit registers the GenerateSymlinkManifest post-commit hook when there is a FileAction among the actions and the compatibility.symlinkFormatManifest.enabled table property is enabled.

Commit Version

commit doCommit with the next version, the actions, attempt number 0, and the select isolation level.

commit prints out the following INFO message to the logs:

Committed delta #[commitVersion] to [logPath]

Performing Post-Commit Operations

commit postCommit (with the version committed and the actions).

Executing Post-Commit Hooks

In the end, commit runs post-commit hooks and returns the version of the successful commit.

doCommitRetryIteratively

doCommitRetryIteratively(
  attemptVersion: Long,
  actions: Seq[Action],
  isolationLevel: IsolationLevel): Long

doCommitRetryIteratively...FIXME

Checking Logical Conflicts with Concurrent Updates

checkForConflicts(
  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int,
  commitIsolationLevel: IsolationLevel): Long

checkForConflicts checks for logical conflicts (of the given actions) with concurrent updates (actions of the commits since the transaction has started).

checkForConflicts gives the next possible commit version unless the following happened between the time of read (checkVersion) and the time of this commit attempt:

  1. Client is up to date with the table protocol for reading and writing (and hence allowed to access the table)
  2. Protocol version has changed
  3. Metadata has changed
  4. AddFiles have been added that the txn should have read based on the given IsolationLevel (Concurrent Append)
  5. AddFiles that the txn read have been deleted (Concurrent Delete)
  6. Files have been deleted by the txn and since the time of read (Concurrent Delete)
  7. Idempotent transactions have conflicted (Multiple Streaming Queries with the same checkpoint location)

checkForConflicts takes the next possible commit version.

For every commit since the time of read (checkVersion) and this commit attempt, checkForConflicts does the following:

  • FIXME

  • Prints out the following INFO message to the logs:

    Completed checking for conflicts Version: [version] Attempt: [attemptNumber] Time: [totalCheckAndRetryTime] ms
    

In the end, checkForConflicts prints out the following INFO message to the logs:

No logical conflicts with deltas [[checkVersion], [nextAttemptVersion]), retrying.

getPrettyPartitionMessage

getPrettyPartitionMessage(
  partitionValues: Map[String, String]): String

getPrettyPartitionMessage...FIXME

postCommit

postCommit(
  commitVersion: Long,
  commitActions: Seq[Action]): Unit

postCommit turns the committed flag on.

postCommit requests the DeltaLog to checkpoint when the given commitVersion is not 0 (first commit) and the checkpoint interval has been reached (based on the given commitVersion).

Note

commitActions argument is not used.

postCommit prints out the following WARN message to the logs in case of IllegalStateException:

Failed to checkpoint table state.

prepareCommit

prepareCommit(
  actions: Seq[Action],
  op: DeltaOperations.Operation): Seq[Action]

prepareCommit adds the newMetadata action (if available) to the given actions.

prepareCommit verifyNewMetadata if there was one.

prepareCommit...FIXME

prepareCommit requests the DeltaLog to protocolWrite.

prepareCommit...FIXME

Multiple Metadata Changes Not Allowed

prepareCommit throws an AssertionError when there are multiple metadata changes in the transaction (by means of Metadata actions):

Cannot change the metadata more than once in a transaction.

Committing Transaction Allowed Once Only

prepareCommit throws an AssertionError when the committed internal flag is enabled:

Transaction already committed.

Registering Post-Commit Hook

registerPostCommitHook(
  hook: PostCommitHook): Unit

registerPostCommitHook registers (adds) the given PostCommitHook to the postCommitHooks internal registry.

runPostCommitHooks

runPostCommitHooks(
  version: Long,
  committedActions: Seq[Action]): Unit

runPostCommitHooks simply runs every post-commit hook registered (in the postCommitHooks internal registry).

runPostCommitHooks clears the active transaction (making all follow-up operations non-transactional).

Note

Hooks may create new transactions.

Handling Non-Fatal Exceptions

For non-fatal exceptions, runPostCommitHooks prints out the following ERROR message to the logs, records the delta event, and requests the post-commit hook to handle the error.

Error when executing post-commit hook [name] for commit [version]

AssertionError

runPostCommitHooks throws an AssertionError when committed flag is disabled:

Can't call post commit hooks before committing

Next Possible Commit Version

getNextAttemptVersion(
  previousAttemptVersion: Long): Long

getNextAttemptVersion requests the DeltaLog to update (and give the latest state snapshot of the delta table).

In the end, getNextAttemptVersion requests the Snapshot for the version and increments it.

Note

The input previousAttemptVersion argument is not used.

Operation Metrics

getOperationMetrics(
  op: Operation): Option[Map[String, String]]

getOperationMetrics gives the metrics of the given Operation when the spark.databricks.delta.history.metricsEnabled configuration property is enabled. Otherwise, getOperationMetrics gives None.

CommitInfo

OptimisticTransactionImpl creates a CommitInfo when requested to commit with spark.databricks.delta.commitInfo.enabled configuration enabled.

OptimisticTransactionImpl uses the CommitInfo to recordDeltaEvent (as a CommitStats).

Attempting Commit

doCommit(
  attemptVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int,
  isolationLevel: IsolationLevel): Long

doCommit returns the given attemptVersion as the commit version if successful or checkAndRetry.

doCommit is used when:


Internally, doCommit prints out the following DEBUG message to the logs:

Attempting to commit version [attemptVersion] with [n] actions with [isolationLevel] isolation level

Writing Out

doCommit requests the DeltaLog for the LogStore to write out the given actions to a delta file in the log directory with the attemptVersion version, e.g.

00000000000000000001.json

doCommit writes the actions out in JSON format.

Note

LogStores must throw a java.nio.file.FileAlreadyExistsException exception if the delta file already exists. Any FileAlreadyExistsExceptions are caught by doCommit itself to checkAndRetry.

Post-Commit Snapshot

doCommit requests the DeltaLog to update.

IllegalStateException

doCommit throws an IllegalStateException when the version of the snapshot after update is smaller than the given attemptVersion version.

The committed version is [attemptVersion] but the current version is [version].

CommitStats

doCommit records a new CommitStats and returns the given attemptVersion as the commit version.

FileAlreadyExistsExceptions

doCommit catches FileAlreadyExistsExceptions and checkAndRetry.

Retrying Commit

checkAndRetry(
  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int): Long

checkAndRetry...FIXME

checkAndRetry is used when OptimisticTransactionImpl is requested to commit (and attempts a commit that failed with an FileAlreadyExistsException).

verifyNewMetadata

verifyNewMetadata(
  metadata: Metadata): Unit

verifyNewMetadata...FIXME

verifyNewMetadata is used when:

withGlobalConfigDefaults

withGlobalConfigDefaults(
  metadata: Metadata): Metadata

withGlobalConfigDefaults...FIXME

withGlobalConfigDefaults is used when:

Looking Up Transaction Version For Given (Streaming Query) ID

txnVersion(
  id: String): Long

txnVersion simply registers (adds) the given ID in the readTxn internal registry.

In the end, txnVersion requests the Snapshot for the transaction version for the given ID or -1.

txnVersion is used when:

User-Defined Metadata

getUserMetadata(
  op: Operation): Option[String]

getUserMetadata returns the Operation.md#userMetadata[userMetadata] of the given Operation.md[] (if defined) or the value of DeltaSQLConf.md#DELTA_USER_METADATA[spark.databricks.delta.commitInfo.userMetadata] configuration property.

getUserMetadata is used when:

Internal Registries

Post-Commit Hooks

postCommitHooks: ArrayBuffer[PostCommitHook]

OptimisticTransactionImpl manages PostCommitHooks that will be executed right after a commit is successful.

Post-commit hooks can be registered, but only the GenerateSymlinkManifest post-commit hook is supported.

newMetadata

newMetadata: Option[Metadata]

OptimisticTransactionImpl uses the newMetadata internal registry for a new Metadata that should be committed with this transaction.

newMetadata is initially undefined (None). It can be updated only once and before the transaction writes out any files.

newMetadata is used when prepareCommit and doCommit (for statistics).

newMetadata is available using metadata method.

readFiles

readFiles: HashSet[AddFile]

OptimisticTransactionImpl uses readFiles registry to track AddFiles that have been seen (scanned) by this transaction (when requested to filterFiles).

Used to determine isBlindAppend and checkForConflicts (and fail if the files have been deleted that the txn read).

readPredicates

readPredicates: ArrayBuffer[Expression]

readPredicates holds predicate expressions for partitions the transaction is modifying.

readPredicates is added a new predicate expression when filterFiles and readWholeTable.

readPredicates is used when checkAndRetry.

Internal Properties

committed

Controls whether the transaction has been committed or not (and prevents prepareCommit from being executed again)

Default: false

Enabled in postCommit

readTxn

Streaming query IDs that have been seen by this transaction

A new queryId is added when OptimisticTransactionImpl is requested for txnVersion

Used when OptimisticTransactionImpl is requested to checkAndRetry (to fail with a ConcurrentTransactionException for idempotent transactions that have conflicted)

snapshotMetadata

Metadata of the Snapshot

readWholeTable

readWholeTable(): Unit

readWholeTable simply adds True literal to the readPredicates internal registry.

readWholeTable is used when:

updateMetadataForNewTable

updateMetadataForNewTable(
  metadata: Metadata): Unit

updateMetadataForNewTable...FIXME

updateMetadataForNewTable is used when:

Metadata

metadata: Metadata

metadata is part of the TransactionalWrite abstraction.

metadata is either the newMetadata (if defined) or the snapshotMetadata.

Updating Metadata

updateMetadata(
  metadata: Metadata): Unit

updateMetadata updates the newMetadata internal property based on the readVersion:

  • For -1, updateMetadata updates the configuration of the given metadata with a new metadata based on the SQLConf (of the active SparkSession), the configuration of the given metadata and a new Protocol

  • For other versions, updateMetadata leaves the given Metadata unchanged

updateMetadata is used when:

AssertionError

updateMetadata throws an AssertionError when the hasWritten flag is enabled:

Cannot update the metadata in a transaction that has already written data.

AssertionError

updateMetadata throws an AssertionError when the newMetadata is not empty:

Cannot change the metadata more than once in a transaction.

Files To Scan Matching Given Predicates

filterFiles(): Seq[AddFile] // (1)
filterFiles(
  filters: Seq[Expression]): Seq[AddFile]
  1. No filters = all files

filterFiles gives the files to scan for the given predicates (filter expressions).

Internally, filterFiles requests the Snapshot for the filesForScan (for no projection attributes and the given filters).

filterFiles finds the partition predicates among the given filters (and the partition columns of the Metadata).

filterFiles registers (adds) the partition predicates (in the readPredicates internal registry) and the files to scan (in the readFiles internal registry).

filterFiles is used when:


Last update: 2021-07-10
Back to top