OptimisticTransactionImpl¶
OptimisticTransactionImpl
is an extension of the TransactionalWrite abstraction for optimistic transactions that can modify a delta table (at a given version) and can be committed eventually.
In other words, OptimisticTransactionImpl
is a set of actions as part of an Operation that changes the state of a delta table transactionally.
Contract¶
Clock¶
clock: Clock
DeltaLog¶
deltaLog: DeltaLog
DeltaLog (of a delta table) that this transaction is changing
deltaLog
is part of the TransactionalWrite abstraction and seems to change it to val
(from def
).
Snapshot¶
snapshot: Snapshot
Snapshot (of the delta table) that this transaction is changing
snapshot
is part of the TransactionalWrite contract and seems to change it to val
(from def
).
Implementations¶
Table Version at Reading Time¶
readVersion: Long
readVersion
requests the Snapshot for the version.
readVersion
is used when:
OptimisticTransactionImpl
is requested to updateMetadata and commit- AlterDeltaTableCommand, ConvertToDeltaCommand, CreateDeltaTableCommand commands are executed
DeltaCommand
is requested to commitLargeWriteIntoDelta
is requested to writeImplicitMetadataOperation
is requested to updateMetadata
Transactional Commit¶
commit(
actions: Seq[Action],
op: DeltaOperations.Operation): Long
commit
commits the transaction (with the Actions and a given Operation)
Usage¶
commit
is used when:
DeltaLog
is requested to upgrade the protocol- ALTER delta table commands (AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, AlterTableAddColumnsDeltaCommand, AlterTableChangeColumnDeltaCommand, AlterTableReplaceColumnsDeltaCommand, AlterTableAddConstraintDeltaCommand, AlterTableDropConstraintDeltaCommand) are executed
- ConvertToDeltaCommand command is executed
- CreateDeltaTableCommand command is executed
- DeleteCommand command is executed
- MergeIntoCommand command is executed
- UpdateCommand command is executed
- WriteIntoDelta command is executed
DeltaSink
is requested to addBatch
Preparing Commit¶
commit
firstly prepares a commit (that gives the final actions to commit that may be different from the given actions).
Isolation Level¶
commit
determines the isolation level based on FileActions (in the given actions) and their dataChange flag.
With all actions with dataChange flag disabled (false
), commit
assumes no data changed and chooses SnapshotIsolation else Serializable.
isBlindAppend¶
commit
...FIXME
CommitInfo¶
commit
...FIXME
Registering Post-Commit Hook¶
commit
registers the GenerateSymlinkManifest post-commit hook when there is a FileAction among the actions and the compatibility.symlinkFormatManifest.enabled table property is enabled.
Commit Version¶
commit
doCommit with the next version, the actions, attempt number 0
, and the select isolation level.
commit
prints out the following INFO message to the logs:
Committed delta #[commitVersion] to [logPath]
Performing Post-Commit Operations¶
commit
postCommit (with the version committed and the actions).
Executing Post-Commit Hooks¶
In the end, commit runs post-commit hooks and returns the version of the successful commit.
doCommitRetryIteratively¶
doCommitRetryIteratively(
attemptVersion: Long,
actions: Seq[Action],
isolationLevel: IsolationLevel): Long
doCommitRetryIteratively
...FIXME
checkForConflicts¶
checkForConflicts(
checkVersion: Long,
actions: Seq[Action],
attemptNumber: Int,
commitIsolationLevel: IsolationLevel): Long
checkForConflicts
...FIXME
getNextAttemptVersion¶
getNextAttemptVersion(
previousAttemptVersion: Long): Long
getNextAttemptVersion
...FIXME
getPrettyPartitionMessage¶
getPrettyPartitionMessage(
partitionValues: Map[String, String]): String
getPrettyPartitionMessage
...FIXME
getOperationMetrics¶
getOperationMetrics(
op: Operation): Option[Map[String, String]]
getOperationMetrics
...FIXME
postCommit¶
postCommit(
commitVersion: Long,
commitActions: Seq[Action]): Unit
postCommit
...FIXME
prepareCommit¶
prepareCommit(
actions: Seq[Action],
op: DeltaOperations.Operation): Seq[Action]
prepareCommit
adds the newMetadata action (if available) to the given actions.
prepareCommit
verifyNewMetadata if there was one.
prepareCommit
...FIXME
prepareCommit
requests the DeltaLog to protocolWrite.
prepareCommit
...FIXME
Multiple Metadata Changes Not Allowed¶
prepareCommit
throws an AssertionError
when there are multiple metadata changes in the transaction (by means of Metadata actions):
Cannot change the metadata more than once in a transaction.
Committing Transaction Allowed Once Only¶
prepareCommit throws an AssertionError
when the committed internal flag is enabled:
Transaction already committed.
Registering Post-Commit Hook¶
registerPostCommitHook(
hook: PostCommitHook): Unit
registerPostCommitHook
registers (adds) the given PostCommitHook to the postCommitHooks internal registry.
runPostCommitHooks¶
runPostCommitHooks(
version: Long,
committedActions: Seq[Action]): Unit
runPostCommitHooks
simply runs every post-commit hook registered (in the postCommitHooks internal registry).
runPostCommitHooks
clears the active transaction (making all follow-up operations non-transactional).
Note
Hooks may create new transactions.
Handling Non-Fatal Exceptions¶
For non-fatal exceptions, runPostCommitHooks
prints out the following ERROR message to the logs, records the delta event, and requests the post-commit hook to handle the error.
Error when executing post-commit hook [name] for commit [version]
AssertionError¶
runPostCommitHooks
throws an AssertionError
when committed flag is disabled:
Can't call post commit hooks before committing
CommitInfo¶
OptimisticTransactionImpl
creates a CommitInfo when requested to commit with spark.databricks.delta.commitInfo.enabled configuration enabled.
OptimisticTransactionImpl
uses the CommitInfo
to recordDeltaEvent
(as a CommitStats
).
Attempting Commit¶
doCommit(
attemptVersion: Long,
actions: Seq[Action],
attemptNumber: Int,
isolationLevel: IsolationLevel): Long
doCommit
returns the given attemptVersion
as the commit version if successful or checkAndRetry.
doCommit
is used when:
- OptimisticTransactionImpl is requested to commit (and checkAndRetry).
Internally, doCommit
prints out the following DEBUG message to the logs:
Attempting to commit version [attemptVersion] with [n] actions with [isolationLevel] isolation level
Writing Out¶
doCommit
requests the DeltaLog for the LogStore to write out the given actions to a delta file in the log directory with the attemptVersion
version, e.g.
00000000000000000001.json
doCommit
writes the actions out in JSON format.
Note
LogStores must throw a java.nio.file.FileAlreadyExistsException
exception if the delta file already exists. Any FileAlreadyExistsExceptions
are caught by doCommit itself to checkAndRetry.
Post-Commit Snapshot¶
doCommit
requests the DeltaLog to update.
IllegalStateException¶
doCommit
throws an IllegalStateException
when the version of the snapshot after update is smaller than the given attemptVersion
version.
The committed version is [attemptVersion] but the current version is [version].
CommitStats¶
doCommit
records a new CommitStats
and returns the given attemptVersion
as the commit version.
FileAlreadyExistsExceptions¶
doCommit
catches FileAlreadyExistsExceptions
and checkAndRetry.
Retrying Commit¶
checkAndRetry(
checkVersion: Long,
actions: Seq[Action],
attemptNumber: Int): Long
checkAndRetry
...FIXME
checkAndRetry
is used when OptimisticTransactionImpl is requested to commit (and attempts a commit that failed with an FileAlreadyExistsException
).
verifyNewMetadata¶
verifyNewMetadata(
metadata: Metadata): Unit
verifyNewMetadata
...FIXME
verifyNewMetadata
is used when:
OptimisticTransactionImpl
is requested to prepareCommit and updateMetadata
withGlobalConfigDefaults¶
withGlobalConfigDefaults(
metadata: Metadata): Metadata
withGlobalConfigDefaults
...FIXME
withGlobalConfigDefaults
is used when:
OptimisticTransactionImpl
is requested to updateMetadata and updateMetadataForNewTable
Looking Up Transaction Version For Given (Streaming Query) ID¶
txnVersion(
id: String): Long
txnVersion
simply registers (adds) the given ID in the readTxn internal registry.
In the end, txnVersion
requests the Snapshot for the transaction version for the given ID or -1
.
txnVersion
is used when:
DeltaSink
is requested to add a streaming micro-batch
User-Defined Metadata¶
getUserMetadata(
op: Operation): Option[String]
getUserMetadata
returns the Operation.md#userMetadata[userMetadata] of the given Operation.md[] (if defined) or the value of DeltaSQLConf.md#DELTA_USER_METADATA[spark.databricks.delta.commitInfo.userMetadata] configuration property.
getUserMetadata
is used when:
OptimisticTransactionImpl
is requested to commit (and spark.databricks.delta.commitInfo.enabled configuration property is enabled)- ConvertToDeltaCommand is executed (and in turn requests
DeltaCommand
to commitLarge)
Internal Registries¶
Post-Commit Hooks¶
postCommitHooks: ArrayBuffer[PostCommitHook]
OptimisticTransactionImpl
manages PostCommitHooks that will be executed right after a commit is successful.
Post-commit hooks can be registered, but only the GenerateSymlinkManifest post-commit hook is supported.
newMetadata¶
newMetadata: Option[Metadata]
OptimisticTransactionImpl
uses the newMetadata
internal registry for a new Metadata that should be committed with this transaction.
newMetadata
is initially undefined (None
). It can be updated only once and before the transaction writes out any files.
newMetadata
is used when prepareCommit and doCommit (for statistics).
newMetadata
is available using metadata method.
readPredicates¶
readPredicates: ArrayBuffer[Expression]
readPredicates
holds predicate expressions for partitions the transaction is modifying.
readPredicates
is added a new predicate expression when filterFiles and readWholeTable.
readPredicates
is used when checkAndRetry.
Internal Properties¶
committed¶
Controls whether the transaction has been committed or not (and prevents prepareCommit from being executed again)
Default: false
Enabled in postCommit
dependsOnFiles¶
Flag that...FIXME
Default: false
Enabled (set to true
) in filterFiles and readWholeTable
Used in commit and checkAndRetry
readFiles¶
readTxn¶
Streaming query IDs that have been seen by this transaction
A new queryId is added when OptimisticTransactionImpl
is requested for txnVersion
Used when OptimisticTransactionImpl
is requested to checkAndRetry (to fail with a ConcurrentTransactionException
for idempotent transactions that have conflicted)
snapshotMetadata¶
readWholeTable¶
readWholeTable(): Unit
readWholeTable
simply adds True
literal to the readPredicates internal registry.
readWholeTable
is used when:
DeltaSink
is requested to add a streaming micro-batch (and the batch reads the same Delta table as this sink is going to write to)
updateMetadataForNewTable¶
updateMetadataForNewTable(
metadata: Metadata): Unit
updateMetadataForNewTable
...FIXME
updateMetadataForNewTable
is used when:
- ConvertToDeltaCommand and CreateDeltaTableCommand are executed
Metadata¶
metadata: Metadata
metadata
is part of the TransactionalWrite abstraction.
metadata
is either the newMetadata (if defined) or the snapshotMetadata.
Updating Metadata¶
updateMetadata(
metadata: Metadata): Unit
updateMetadata
updates the newMetadata internal property based on the readVersion:
-
For
-1
,updateMetadata
updates the configuration of the given metadata with a new metadata based on theSQLConf
(of the activeSparkSession
), the configuration of the given metadata and a new Protocol -
For other versions,
updateMetadata
leaves the given Metadata unchanged
updateMetadata
is used when:
OptimisticTransactionImpl
is requested to updateMetadataForNewTable- AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, AlterTableAddColumnsDeltaCommand, AlterTableChangeColumnDeltaCommand, AlterTableReplaceColumnsDeltaCommand are executed
-
ConvertToDeltaCommand is executed
-
ImplicitMetadataOperation
is requested to updateMetadata
AssertionError¶
updateMetadata
throws an AssertionError
when the hasWritten flag is enabled:
Cannot update the metadata in a transaction that has already written data.
AssertionError¶
updateMetadata
throws an AssertionError
when the newMetadata is not empty:
Cannot change the metadata more than once in a transaction.
Files To Scan Matching Given Predicates¶
filterFiles(): Seq[AddFile] // Uses `true` literal to mean that all files match
filterFiles(
filters: Seq[Expression]): Seq[AddFile]
filterFiles
gives the files to scan based on the given predicates (filter expressions).
Internally, filterFiles
requests the Snapshot for the filesForScan (for no projection attributes and the given filters).
filterFiles
finds the partition predicates among the given filters (and the partition columns of the Metadata).
filterFiles
registers (adds) the partition predicates (in the readPredicates internal registry) and the files to scan (in the readFiles internal registry).
filterFiles
is used when:
WriteIntoDelta
is requested to writeDeltaSink
is requested to add a streaming micro-batch (withComplete
output mode)- DeleteCommand, MergeIntoCommand and UpdateCommand are executed
- CreateDeltaTableCommand is executed