OptimisticTransactionImpl

OptimisticTransactionImpl is an extension of the TransactionalWrite abstraction for optimistic transactions that can modify a Delta table (at a given version) and can be committed eventually.

In other words, OptimisticTransactionImpl is a set of actions as part of an Operation.

Contract

clock

clock: Clock

deltaLog

deltaLog: DeltaLog

DeltaLog (of the delta table) that this transaction is changing

deltaLog is part of the TransactionalWrite contract and seems to change it to val (from def).

snapshot

snapshot: Snapshot

Snapshot (of the delta table) that this transaction is changing

snapshot is part of the TransactionalWrite contract and seems to change it to val (from def).

Implementations

OptimisticTransaction is the default and only known OptimisticTransactionImpl in Delta Lake.

metadata Method

metadata: Metadata

metadata is either the newMetadata (if defined) or the snapshotMetadata.

metadata is part of the TransactionalWrite abstraction.

readVersion Method

readVersion: Long

readVersion simply requests the Snapshot for the version.

readVersion is used when:

Updating Metadata

updateMetadata(
  metadata: Metadata): Unit

updateMetadata updates the newMetadata internal property based on the readVersion:

  • For -1, updateMetadata updates the configuration of the given metadata with a new metadata based on the SQLConf (of the active SparkSession), the configuration of the given metadata and a new Protocol

  • For other versions, updateMetadata leaves the given Metadata unchanged

updateMetadata throws an AssertionError when the hasWritten flag is enabled (true):

Cannot update the metadata in a transaction that has already written data.

updateMetadata throws an AssertionError when the newMetadata is not empty:

Cannot change the metadata more than once in a transaction.

updateMetadata is used when:

Files To Scan Matching Given Predicates

filterFiles(): Seq[AddFile] (1)
filterFiles(
  filters: Seq[Expression]): Seq[AddFile]
1 Uses true literal to mean that all files match

filterFiles gives the files to scan based on the given predicates (filter expressions).

Internally, filterFiles requests the Snapshot for the filesForScan (for no projection attributes and the given filters).

filterFiles finds the partition predicates among the given filters (and the partition columns of the Metadata).

filterFiles registers (adds) the partition predicates (in the readPredicates internal registry) and the files to scan (in the readFiles internal registry).

filterFiles is used when:

readWholeTable Method

readWholeTable(): Unit

readWholeTable simply adds True literal to the readPredicates internal registry.

readWholeTable is used when DeltaSink is requested to add a streaming micro-batch (and the batch reads the same Delta table as this sink is going to write to).

Committing Transaction

commit(
  actions: Seq[Action],
  op: DeltaOperations.Operation): Long

commit commits transaction (actions for an Operation).

commit firstly prepares a commit (that gives the final actions to commit that may be different from the given actions).

commit determines the isolation level for this commit by checking whether any FileAction (in the given actions) has the dataChange flag on (true). With no data changed, commit uses SnapshotIsolation else Serializable.

commit…​FIXME

commit…​FIXME

commit registers the GenerateSymlinkManifest post-commit hook when there is a FileAction among the actions and the compatibility.symlinkFormatManifest.enabled table property (from the Metadata) is enabled (true).

compatibility.symlinkFormatManifest.enabled table property defaults to false.

commit doCommit with the next version, the actions, attempt number 0, and the select isolation level.

commit prints out the following INFO message to the logs:

Committed delta #[commitVersion] to [logPath]

commit postCommit (with the version committed and the actions).

In the end, commit runs post-commit hooks and returns the version of the successful commit.

commit is used when:

Preparing Commit

prepareCommit(
  actions: Seq[Action],
  op: DeltaOperations.Operation): Seq[Action]

prepareCommit adds the newMetadata action (if available) to the given actions.

prepareCommit verifyNewMetadata if there was one.

prepareCommit…​FIXME

prepareCommit requests the DeltaLog to protocolWrite.

prepareCommit…​FIXME

prepareCommit throws an AssertionError when the number of metadata changes in the transaction (by means of Metadata actions) is above 1:

Cannot change the metadata more than once in a transaction.

prepareCommit throws an AssertionError when the committed internal flag is turned on (true):

Transaction already committed.

prepareCommit is used when OptimisticTransactionImpl is requested to commit (at the beginning).

Performing Post-Commit Operations

postCommit(
  commitVersion: Long,
  commitActions: Seq[Action]): Unit

postCommit…​FIXME

postCommit is used when OptimisticTransactionImpl is requested to commit (at the end).

CommitInfo

OptimisticTransactionImpl creates a CommitInfo when requested to commit with spark.databricks.delta.commitInfo.enabled configuration enabled.

OptimisticTransactionImpl uses the CommitInfo to recordDeltaEvent (as a CommitStats).

Registering Post-Commit Hook

registerPostCommitHook(
  hook: PostCommitHook): Unit

registerPostCommitHook registers (adds) the given PostCommitHook to the postCommitHooks internal registry.

registerPostCommitHook adds the hook only once.

registerPostCommitHook is used when OptimisticTransactionImpl is requested to commit (to register the GenerateSymlinkManifest post-commit hook).

Running Post-Commit Hooks

runPostCommitHooks(
  version: Long,
  committedActions: Seq[Action]): Unit

runPostCommitHooks simply runs every post-commit hook registered (in the postCommitHooks internal registry).

runPostCommitHooks clears the active transaction (making all follow-up operations non-transactional).

Hooks may create new transactions.

For any non-fatal exception, runPostCommitHooks prints out the following ERROR message to the logs, records the delta event, and requests the post-commit hook to handle the error.

Error when executing post-commit hook [name] for commit [version]

runPostCommitHooks throws an AssertionError when committed flag is turned off (false):

Can't call post commit hooks before committing

runPostCommitHooks is used when OptimisticTransactionImpl is requested to commit.

Attempting Commit

doCommit(
  attemptVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int): Long

doCommit returns the given attemptVersion as the commit version if successful or checkAndRetry.

Internally, doCommit prints out the following DEBUG message to the logs:

Attempting to commit version [attemptVersion] with [size] actions with [isolationLevel] isolation level

doCommit requests the LogStore (of the DeltaLog) to write out the given actions (serialized to JSON format) to a delta file (e.g. 00000000000000000001.json) in the log directory (of the DeltaLog) with the attemptVersion version.

LogStores must throw a java.nio.file.FileAlreadyExistsException exception if the delta file already exists. Any FileAlreadyExistsExceptions are caught by doCommit itself to checkAndRetry.

doCommit requests the DeltaLog to update.

doCommit throws an IllegalStateException if the version of the snapshot after update is smaller than the given attemptVersion version.

The committed version is [attemptVersion] but the current version is [version].

doCommit records a new CommitStats and returns the given attemptVersion as the commit version.

doCommit catches FileAlreadyExistsExceptions and checkAndRetry.

doCommit is used when OptimisticTransactionImpl is requested to commit (and checkAndRetry).

Retrying Commit

checkAndRetry(
  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int): Long

checkAndRetry…​FIXME

checkAndRetry is used when OptimisticTransactionImpl is requested to commit (and attempts a commit that failed with an FileAlreadyExistsException).

verifyNewMetadata Method

verifyNewMetadata(
  metadata: Metadata): Unit

verifyNewMetadata…​FIXME

verifyNewMetadata is used when OptimisticTransactionImpl is requested to prepareCommit and updateMetadata.

Looking Up Transaction Version For Given (Streaming Query) ID

txnVersion(
  id: String): Long

txnVersion simply registers (adds) the given ID in the readTxn internal registry.

In the end, txnVersion requests the Snapshot for the transaction version for the given ID or assumes -1.

txnVersion is used when DeltaSink is requested to add a streaming micro-batch.

getOperationMetrics Method

getOperationMetrics(
  op: Operation): Option[Map[String, String]]

getOperationMetrics…​FIXME

getOperationMetrics is used when…​FIXME

User-Defined Metadata

getUserMetadata(
  op: Operation): Option[String]

getUserMetadata returns the userMetadata of the given Operation (if defined) or the value of spark.databricks.delta.commitInfo.userMetadata configuration property.

getUserMetadata is used when OptimisticTransactionImpl is requested to commit (and spark.databricks.delta.commitInfo.enabled configuration property is enabled).

getPrettyPartitionMessage Method

getPrettyPartitionMessage(
  partitionValues: Map[String, String]): String

getPrettyPartitionMessage…​FIXME

getPrettyPartitionMessage is used when…​FIXME

getNextAttemptVersion Internal Method

getNextAttemptVersion(
  previousAttemptVersion: Long): Long

getNextAttemptVersion…​FIXME

getNextAttemptVersion is used when OptimisticTransactionImpl is requested to checkAndRetry.

Internal Registries

Post-Commit Hooks

postCommitHooks: ArrayBuffer[PostCommitHook]

OptimisticTransactionImpl manages PostCommitHooks that will be executed right after a commit is successful.

Post-commit hooks can be registered, but only the GenerateSymlinkManifest post-commit hook is supported (when…​FIXME).

newMetadata

newMetadata: Option[Metadata]

OptimisticTransactionImpl uses the newMetadata internal registry for a new Metadata that should be committed with this transaction.

newMetadata is initially undefined (None). It can be updated only once and before the transaction writes out any files.

newMetadata is used when prepareCommit (and doCommit for statistics).

newMetadata is available using metadata method.

readPredicates

readPredicates: ArrayBuffer[Expression]

readPredicates holds predicate expressions for partitions the transaction is modifying.

readPredicates is added a new predicate expression when filterFiles and readWholeTable.

readPredicates is used when checkAndRetry.

Internal Properties

Name Description

committed

Flag that controls whether the transaction is committed or not (and prevents prepareCommit from being executed again)

Default: false

Enabled (set to true) exclusively in postCommit

dependsOnFiles

Flag that…​FIXME

Default: false

Enabled (set to true) in filterFiles, readWholeTable

Used in commit and checkAndRetry

readFiles

readTxn

Streaming query IDs that have been seen by this transaction

A new queryId is added when OptimisticTransactionImpl is requested for txnVersion

Used when OptimisticTransactionImpl is requested to checkAndRetry (to fail with a ConcurrentTransactionException for idempotent transactions that have conflicted)

snapshotMetadata