Skip to content

OptimisticTransactionImpl

OptimisticTransactionImpl is an <> of the TransactionalWrite.md[] abstraction for <> that can modify a <> (at a given <>) and can be <> eventually.

In other words, OptimisticTransactionImpl is a set of Action.md[actions] as part of an Operation.md[].

== [[contract]] Contract

=== [[clock]] clock

[source,scala]

clock: Clock

=== [[deltaLog]] deltaLog

[source,scala]

deltaLog: DeltaLog

DeltaLog.md[] (of the delta table) that this transaction is changing

deltaLog is part of the TransactionalWrite.md#deltaLog[TransactionalWrite] contract and seems to change it to val (from def).

=== [[snapshot]] snapshot

[source,scala]

snapshot: Snapshot

Snapshot.md[] (of the <>) that this transaction is changing

snapshot is part of the TransactionalWrite.md#deltaLog[TransactionalWrite] contract and seems to change it to val (from def).

== [[implementations]] Implementations

OptimisticTransaction.md[] is the default and only known OptimisticTransactionImpl in Delta Lake.

== [[metadata]] metadata Method

[source, scala]

metadata: Metadata

metadata is either the <> (if defined) or the <>.

metadata is part of the TransactionalWrite.md#metadata[TransactionalWrite] abstraction.

== [[readVersion]] readVersion Method

[source, scala]

readVersion: Long

readVersion simply requests the <> for the <>.

readVersion is used when:

  • OptimisticTransactionImpl is requested for <>, to <> and <>

  • ConvertToDeltaCommand is requested to <>

  • WriteIntoDelta is requested to <>

  • ImplicitMetadataOperation is requested to <>

== [[updateMetadata]] Updating Metadata

[source, scala]

updateMetadata( metadata: Metadata): Unit


updateMetadata updates the <> internal property based on the <>:

  • For -1, updateMetadata updates the <> of the given metadata with a <> based on the SQLConf (of the active SparkSession), the <> of the given metadata and a new <>

  • For other versions, updateMetadata leaves the given <> unchanged

[[updateMetadata-AssertionError-hasWritten]] updateMetadata throws an AssertionError when the <> flag is enabled (true):

Cannot update the metadata in a transaction that has already written data.

updateMetadata throws an AssertionError when the <> is not empty:

Cannot change the metadata more than once in a transaction.

updateMetadata is used when:

  • <> is executed (and requested to <>)

  • ImplicitMetadataOperation is requested to <>

Files To Scan Matching Given Predicates

filterFiles(): Seq[AddFile] // Uses `true` literal to mean that all files match
filterFiles(
  filters: Seq[Expression]): Seq[AddFile]

filterFiles gives the files to scan based on the given predicates (filter expressions).

Internally, filterFiles requests the Snapshot for the filesForScan (for no projection attributes and the given filters).

filterFiles finds the partition predicates among the given filters (and the partition columns of the Metadata).

filterFiles registers (adds) the partition predicates (in the readPredicates internal registry) and the files to scan (in the readFiles internal registry).

filterFiles is used when:

== [[readWholeTable]] readWholeTable Method

[source, scala]

readWholeTable(): Unit

readWholeTable simply adds True literal to the <> internal registry.

readWholeTable is used when DeltaSink is requested to DeltaSink.md#addBatch[add a streaming micro-batch] (and the batch reads the same Delta table as this sink is going to write to).

Committing Transaction

commit(
  actions: Seq[Action],
  op: DeltaOperations.Operation): Long

commit commits the transaction (with the Actions and a given Operation)

[[commit-prepareCommit]] commit firstly <> (that gives the final actions to commit that may be different from the given <>).

[[commit-isolationLevelToUse]] commit determines the isolation level for this commit by checking whether any <> (in the given <>) has the <> flag on (true). With no data changed, commit uses SnapshotIsolation else Serializable.

[[commit-isBlindAppend]] commit...FIXME

[[commit-commitInfo]] commit...FIXME

[[commit-registerPostCommitHook]] commit <> the <> post-commit hook when there is a <> among the actions and the <> table property (<> the <>) is enabled (true).

NOTE: <> table property defaults to false.

[[commit-commitVersion]] commit <> with the next version, the actions, attempt number 0, and the select isolation level.

commit prints out the following INFO message to the logs:

Committed delta #[commitVersion] to [logPath]

[[commit-postCommit]] commit <> (with the version committed and the actions).

[[commit-runPostCommitHooks]] In the end, commit <> and returns the version of the successful commit.

== [[prepareCommit]] Preparing Commit

[source, scala]

prepareCommit( actions: Seq[Action], op: DeltaOperations.Operation): Seq[Action]


prepareCommit adds the <> action (if available) to the given <>.

prepareCommit <> if there was one.

prepareCommit...FIXME

prepareCommit requests the <> to <>.

prepareCommit...FIXME

prepareCommit throws an AssertionError when the number of metadata changes in the transaction (by means of <> actions) is above 1:

Cannot change the metadata more than once in a transaction.

prepareCommit throws an AssertionError when the <> internal flag is turned on (true):

Transaction already committed.

prepareCommit is used when OptimisticTransactionImpl is requested to <> (at the beginning).

== [[postCommit]] Performing Post-Commit Operations

[source, scala]

postCommit( commitVersion: Long, commitActions: Seq[Action]): Unit


postCommit...FIXME

postCommit is used when OptimisticTransactionImpl is requested to <> (at the end).

== [[commitInfo]] CommitInfo

OptimisticTransactionImpl creates a CommitInfo.md[] when requested to <> with DeltaSQLConf.md#commitInfo.enabled[spark.databricks.delta.commitInfo.enabled] configuration enabled.

OptimisticTransactionImpl uses the CommitInfo to recordDeltaEvent (as a CommitStats).

== [[registerPostCommitHook]] Registering Post-Commit Hook

[source, scala]

registerPostCommitHook( hook: PostCommitHook): Unit


registerPostCommitHook registers (adds) the given <> to the <> internal registry.

NOTE: registerPostCommitHook adds the hook only once.

registerPostCommitHook is used when OptimisticTransactionImpl is requested to <> (to register the <> post-commit hook).

== [[runPostCommitHooks]] Running Post-Commit Hooks

[source, scala]

runPostCommitHooks( version: Long, committedActions: Seq[Action]): Unit


runPostCommitHooks simply <> every <> registered (in the <> internal registry).

runPostCommitHooks <> (making all follow-up operations non-transactional).

NOTE: Hooks may create new transactions.

For any non-fatal exception, runPostCommitHooks prints out the following ERROR message to the logs, records the delta event, and requests the post-commit hook to <>.

Error when executing post-commit hook [name] for commit [version]

runPostCommitHooks throws an AssertionError when <> flag is turned off (false):

Can't call post commit hooks before committing

runPostCommitHooks is used when OptimisticTransactionImpl is requested to <>.

== [[doCommit]] Attempting Commit

[source, scala]

doCommit( attemptVersion: Long, actions: Seq[Action], attemptNumber: Int): Long


doCommit returns the given attemptVersion as the commit version if successful or <>.

Internally, doCommit prints out the following DEBUG message to the logs:

Attempting to commit version [attemptVersion] with [size] actions with [isolationLevel] isolation level

[[doCommit-write]] doCommit requests the <> (of the <>) to <> the given <> (serialized to <>) to a <> (e.g. 00000000000000000001.json) in the <> (of the <>) with the attemptVersion version.

NOTE: <> must throw a java.nio.file.FileAlreadyExistsException exception if the delta file already exists. Any FileAlreadyExistsExceptions are caught by <> itself to <>.

[[doCommit-postCommitSnapshot]] doCommit requests the <> to <>.

[[doCommit-IllegalStateException]] doCommit throws an IllegalStateException if the version of the snapshot after update is smaller than the given attemptVersion version.

The committed version is [attemptVersion] but the current version is [version].

[[doCommit-stats]] doCommit records a new CommitStats and returns the given attemptVersion as the commit version.

[[doCommit-FileAlreadyExistsException]] doCommit catches FileAlreadyExistsExceptions and <>.

doCommit is used when OptimisticTransactionImpl is requested to <> (and <>).

Retrying Commit

checkAndRetry(
  checkVersion: Long,
  actions: Seq[Action],
  attemptNumber: Int): Long

checkAndRetry...FIXME

checkAndRetry is used when OptimisticTransactionImpl is requested to commit (and attempts a commit that failed with an FileAlreadyExistsException).

== [[verifyNewMetadata]] verifyNewMetadata Method

[source, scala]

verifyNewMetadata( metadata: Metadata): Unit


verifyNewMetadata...FIXME

verifyNewMetadata is used when OptimisticTransactionImpl is requested to <> and <>.

== [[txnVersion]] Looking Up Transaction Version For Given (Streaming Query) ID

[source, scala]

txnVersion( id: String): Long


txnVersion simply registers (adds) the given ID in the <> internal registry.

In the end, txnVersion requests the <> for the <> or assumes -1.

txnVersion is used when DeltaSink is requested to <>.

getOperationMetrics Method

getOperationMetrics(
  op: Operation): Option[Map[String, String]]

getOperationMetrics...FIXME

getOperationMetrics is used when OptimisticTransactionImpl is requested to commit.

== [[getUserMetadata]] User-Defined Metadata

[source,scala]

getUserMetadata( op: Operation): Option[String]


getUserMetadata returns the Operation.md#userMetadata[userMetadata] of the given Operation.md[] (if defined) or the value of DeltaSQLConf.md#DELTA_USER_METADATA[spark.databricks.delta.commitInfo.userMetadata] configuration property.

getUserMetadata is used when OptimisticTransactionImpl is requested to <> (and DeltaSQLConf.md#DELTA_COMMIT_INFO_ENABLED[spark.databricks.delta.commitInfo.enabled] configuration property is enabled).

== [[getPrettyPartitionMessage]] getPrettyPartitionMessage Method

[source,scala]

getPrettyPartitionMessage( partitionValues: Map[String, String]): String


getPrettyPartitionMessage...FIXME

getPrettyPartitionMessage is used when...FIXME

== [[getNextAttemptVersion]] getNextAttemptVersion Internal Method

[source,scala]

getNextAttemptVersion( previousAttemptVersion: Long): Long


getNextAttemptVersion...FIXME

getNextAttemptVersion is used when OptimisticTransactionImpl is requested to <>.

== [[internal-registries]] Internal Registries

=== [[postCommitHooks]] Post-Commit Hooks

[source, scala]

postCommitHooks: ArrayBuffer[PostCommitHook]

OptimisticTransactionImpl manages PostCommitHook.md[]s that will be <> right after a <> is successful.

Post-commit hooks can be <>, but only the <> post-commit hook is supported (when...FIXME).

=== [[newMetadata]] newMetadata

[source, scala]

newMetadata: Option[Metadata]

OptimisticTransactionImpl uses the newMetadata internal registry for a new <> that should be committed with this transaction.

newMetadata is initially undefined (None). It can be <> only once and before the transaction <>.

newMetadata is used when <> (and <> for statistics).

newMetadata is available using <> method.

=== [[readPredicates]] readPredicates

[source,scala]

readPredicates: ArrayBuffer[Expression]

readPredicates holds predicate expressions for partitions the transaction is modifying.

readPredicates is added a new predicate expression when <> and <>.

readPredicates is used when <>.

== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| committed a| [[committed]] Flag that controls whether the transaction is <> or not (and prevents <> from being executed again)

Default: false

Enabled (set to true) exclusively in <>

| dependsOnFiles a| [[dependsOnFiles]] Flag that...FIXME

Default: false

Enabled (set to true) in <>, <>

Used in <> and <>

| readFiles a| [[readFiles]]

| readTxn a| [[readTxn]] Streaming query IDs that have been seen by this transaction

A new queryId is added when OptimisticTransactionImpl is requested for <>

Used when OptimisticTransactionImpl is requested to <> (to fail with a ConcurrentTransactionException for idempotent transactions that have conflicted)

| snapshotMetadata a| [[snapshotMetadata]] <> of the <>

|===


Last update: 2020-10-05