Skip to content

OptimisticTransactionImpl

OptimisticTransactionImpl is an extension of the TransactionalWrite abstraction for optimistic transactions that can modify a delta table (at a given version) and can be committed eventually.

In other words, OptimisticTransactionImpl is a set of actions as part of an Operation that changes the state of a delta table transactionally.

Contract

Clock

clock: Clock

DeltaLog

deltaLog: DeltaLog

DeltaLog (of a delta table) that this transaction is changing

deltaLog is part of the TransactionalWrite abstraction and seems to change it to val (from def).

Snapshot

snapshot: Snapshot

Snapshot (of the delta table) that this transaction is changing

snapshot is part of the TransactionalWrite contract and seems to change it to val (from def).

Implementations

Table Version at Reading Time

readVersion: Long

readVersion requests the Snapshot for the version.

readVersion is used when:

Transactional Commit

commit(
  actions: Seq[Action],
  op: DeltaOperations.Operation): Long

commit commits the transaction (with the Actions and a given Operation)

Usage

commit is used when:

Preparing Commit

commit firstly prepares a commit (that gives the final actions to commit that may be different from the given actions).

Isolation Level

commit determines the isolation level based on FileActions (in the given actions) and their dataChange flag.

With all actions with dataChange flag disabled (false), commit assumes no data changed and chooses SnapshotIsolation else Serializable.

isBlindAppend

commit...FIXME

CommitInfo

commit...FIXME

Registering Post-Commit Hook

commit registers the GenerateSymlinkManifest post-commit hook when there is a FileAction among the actions and the compatibility.symlinkFormatManifest.enabled table property is enabled.

Commit Version

commit doCommit with the next version, the actions, attempt number 0, and the select isolation level.

commit prints out the following INFO message to the logs:

Committed delta #[commitVersion] to [logPath]

Performing Post-Commit Operations

commit postCommit (with the version committed and the actions).

Executing Post-Commit Hooks

In the end, commit runs post-commit hooks and returns the version of the successful commit.

doCommitRetryIteratively

doCommitRetryIteratively(
  attemptVersion: Long,
  actions: Seq[Action],
  isolationLevel: IsolationLevel): Long

doCommitRetryIteratively...FIXME

checkForConflicts

checkForConflicts(
  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int,
  commitIsolationLevel: IsolationLevel): Long

checkForConflicts...FIXME

getNextAttemptVersion

getNextAttemptVersion(
  previousAttemptVersion: Long): Long

getNextAttemptVersion...FIXME

getPrettyPartitionMessage

getPrettyPartitionMessage(
  partitionValues: Map[String, String]): String

getPrettyPartitionMessage...FIXME

getOperationMetrics

getOperationMetrics(
  op: Operation): Option[Map[String, String]]

getOperationMetrics...FIXME

postCommit

postCommit(
  commitVersion: Long,
  commitActions: Seq[Action]): Unit

postCommit...FIXME

prepareCommit

prepareCommit(
  actions: Seq[Action],
  op: DeltaOperations.Operation): Seq[Action]

prepareCommit adds the newMetadata action (if available) to the given actions.

prepareCommit verifyNewMetadata if there was one.

prepareCommit...FIXME

prepareCommit requests the DeltaLog to protocolWrite.

prepareCommit...FIXME

Multiple Metadata Changes Not Allowed

prepareCommit throws an AssertionError when there are multiple metadata changes in the transaction (by means of Metadata actions):

Cannot change the metadata more than once in a transaction.

Committing Transaction Allowed Once Only

prepareCommit throws an AssertionError when the committed internal flag is enabled:

Transaction already committed.

Registering Post-Commit Hook

registerPostCommitHook(
  hook: PostCommitHook): Unit

registerPostCommitHook registers (adds) the given PostCommitHook to the postCommitHooks internal registry.

runPostCommitHooks

runPostCommitHooks(
  version: Long,
  committedActions: Seq[Action]): Unit

runPostCommitHooks simply runs every post-commit hook registered (in the postCommitHooks internal registry).

runPostCommitHooks clears the active transaction (making all follow-up operations non-transactional).

Note

Hooks may create new transactions.

Handling Non-Fatal Exceptions

For non-fatal exceptions, runPostCommitHooks prints out the following ERROR message to the logs, records the delta event, and requests the post-commit hook to handle the error.

Error when executing post-commit hook [name] for commit [version]

AssertionError

runPostCommitHooks throws an AssertionError when committed flag is disabled:

Can't call post commit hooks before committing

CommitInfo

OptimisticTransactionImpl creates a CommitInfo when requested to commit with spark.databricks.delta.commitInfo.enabled configuration enabled.

OptimisticTransactionImpl uses the CommitInfo to recordDeltaEvent (as a CommitStats).

Attempting Commit

doCommit(
  attemptVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int,
  isolationLevel: IsolationLevel): Long

doCommit returns the given attemptVersion as the commit version if successful or checkAndRetry.

doCommit is used when:


Internally, doCommit prints out the following DEBUG message to the logs:

Attempting to commit version [attemptVersion] with [n] actions with [isolationLevel] isolation level

Writing Out

doCommit requests the DeltaLog for the LogStore to write out the given actions to a delta file in the log directory with the attemptVersion version, e.g.

00000000000000000001.json

doCommit writes the actions out in JSON format.

Note

LogStores must throw a java.nio.file.FileAlreadyExistsException exception if the delta file already exists. Any FileAlreadyExistsExceptions are caught by doCommit itself to checkAndRetry.

Post-Commit Snapshot

doCommit requests the DeltaLog to update.

IllegalStateException

doCommit throws an IllegalStateException when the version of the snapshot after update is smaller than the given attemptVersion version.

The committed version is [attemptVersion] but the current version is [version].

CommitStats

doCommit records a new CommitStats and returns the given attemptVersion as the commit version.

FileAlreadyExistsExceptions

doCommit catches FileAlreadyExistsExceptions and checkAndRetry.

Retrying Commit

checkAndRetry(
  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int): Long

checkAndRetry...FIXME

checkAndRetry is used when OptimisticTransactionImpl is requested to commit (and attempts a commit that failed with an FileAlreadyExistsException).

verifyNewMetadata

verifyNewMetadata(
  metadata: Metadata): Unit

verifyNewMetadata...FIXME

verifyNewMetadata is used when:

withGlobalConfigDefaults

withGlobalConfigDefaults(
  metadata: Metadata): Metadata

withGlobalConfigDefaults...FIXME

withGlobalConfigDefaults is used when:

Looking Up Transaction Version For Given (Streaming Query) ID

txnVersion(
  id: String): Long

txnVersion simply registers (adds) the given ID in the readTxn internal registry.

In the end, txnVersion requests the Snapshot for the transaction version for the given ID or -1.

txnVersion is used when:

User-Defined Metadata

getUserMetadata(
  op: Operation): Option[String]

getUserMetadata returns the Operation.md#userMetadata[userMetadata] of the given Operation.md[] (if defined) or the value of DeltaSQLConf.md#DELTA_USER_METADATA[spark.databricks.delta.commitInfo.userMetadata] configuration property.

getUserMetadata is used when:

Internal Registries

Post-Commit Hooks

postCommitHooks: ArrayBuffer[PostCommitHook]

OptimisticTransactionImpl manages PostCommitHooks that will be executed right after a commit is successful.

Post-commit hooks can be registered, but only the GenerateSymlinkManifest post-commit hook is supported.

newMetadata

newMetadata: Option[Metadata]

OptimisticTransactionImpl uses the newMetadata internal registry for a new Metadata that should be committed with this transaction.

newMetadata is initially undefined (None). It can be updated only once and before the transaction writes out any files.

newMetadata is used when prepareCommit and doCommit (for statistics).

newMetadata is available using metadata method.

readPredicates

readPredicates: ArrayBuffer[Expression]

readPredicates holds predicate expressions for partitions the transaction is modifying.

readPredicates is added a new predicate expression when filterFiles and readWholeTable.

readPredicates is used when checkAndRetry.

Internal Properties

committed

Controls whether the transaction has been committed or not (and prevents prepareCommit from being executed again)

Default: false

Enabled in postCommit

dependsOnFiles

Flag that...FIXME

Default: false

Enabled (set to true) in filterFiles and readWholeTable

Used in commit and checkAndRetry

readFiles

readTxn

Streaming query IDs that have been seen by this transaction

A new queryId is added when OptimisticTransactionImpl is requested for txnVersion

Used when OptimisticTransactionImpl is requested to checkAndRetry (to fail with a ConcurrentTransactionException for idempotent transactions that have conflicted)

snapshotMetadata

Metadata of the Snapshot

readWholeTable

readWholeTable(): Unit

readWholeTable simply adds True literal to the readPredicates internal registry.

readWholeTable is used when:

updateMetadataForNewTable

updateMetadataForNewTable(
  metadata: Metadata): Unit

updateMetadataForNewTable...FIXME

updateMetadataForNewTable is used when:

Metadata

metadata: Metadata

metadata is part of the TransactionalWrite abstraction.

metadata is either the newMetadata (if defined) or the snapshotMetadata.

Updating Metadata

updateMetadata(
  metadata: Metadata): Unit

updateMetadata updates the newMetadata internal property based on the readVersion:

  • For -1, updateMetadata updates the configuration of the given metadata with a new metadata based on the SQLConf (of the active SparkSession), the configuration of the given metadata and a new Protocol

  • For other versions, updateMetadata leaves the given Metadata unchanged

updateMetadata is used when:

AssertionError

updateMetadata throws an AssertionError when the hasWritten flag is enabled:

Cannot update the metadata in a transaction that has already written data.

AssertionError

updateMetadata throws an AssertionError when the newMetadata is not empty:

Cannot change the metadata more than once in a transaction.

Files To Scan Matching Given Predicates

filterFiles(): Seq[AddFile] // Uses `true` literal to mean that all files match
filterFiles(
  filters: Seq[Expression]): Seq[AddFile]

filterFiles gives the files to scan based on the given predicates (filter expressions).

Internally, filterFiles requests the Snapshot for the filesForScan (for no projection attributes and the given filters).

filterFiles finds the partition predicates among the given filters (and the partition columns of the Metadata).

filterFiles registers (adds) the partition predicates (in the readPredicates internal registry) and the files to scan (in the readFiles internal registry).

filterFiles is used when:


Last update: 2021-03-28