OptimisticTransaction¶
OptimisticTransaction
is an OptimisticTransactionImpl (which seems more of a class name change than anything more important).
OptimisticTransaction
is created for changes to a delta table at a given version.
When OptimisticTransaction
(as a OptimisticTransactionImpl) is about to be committed (that does doCommit internally), the LogStore (of the delta table) is requested to write actions to a delta file (e.g. _delta_log/00000000000000000001.json
for the attempt version 1
). Unless a FileAlreadyExistsException
is thrown a commit is considered successful or retried.
OptimisticTransaction
can be associated with a thread as an active transaction.
Demo¶
import org.apache.spark.sql.delta.DeltaLog
val dir = "/tmp/delta/users"
val log = DeltaLog.forTable(spark, dir)
val txn = log.startTransaction()
// ...changes to a delta table...
val addFile = AddFile("foo", Map.empty, 1L, System.currentTimeMillis(), dataChange = true)
val removeFile = addFile.remove
val actions = addFile :: removeFile :: Nil
txn.commit(actions, op)
Alternatively, you could do the following instead.
deltaLog.withNewTransaction { txn =>
// ...transactional changes to a delta table
}
Creating Instance¶
OptimisticTransaction
takes the following to be created:
Note
The DeltaLog and Snapshot are part of the OptimisticTransactionImpl abstraction (which in turn inherits them as a TransactionalWrite and simply changes to val
from def
).
OptimisticTransaction
is created when DeltaLog
is used for the following:
Active Thread-Local OptimisticTransaction¶
active: ThreadLocal[OptimisticTransaction]
active
is a Java ThreadLocal with the OptimisticTransaction
of the current thread.
ThreadLocal
ThreadLocal
provides thread-local variables. These variables differ from their normal counterparts in that each thread that accesses one (via its get or set method) has its own, independently initialized copy of the variable.
ThreadLocal
instances are typically private static fields in classes that wish to associate state with a thread (e.g., a user ID or Transaction ID).
active
is assigned to the current thread using setActive utility and cleared in clearActive.
active
is available using getActive utility.
There can only be one active OptimisticTransaction
(or an IllegalStateException
is thrown).
setActive¶
setActive(
txn: OptimisticTransaction): Unit
setActive
associates the given OptimisticTransaction
as active with the current thread.
setActive
throws an IllegalStateException
if there is an active OptimisticTransaction already associated:
Cannot set a new txn as active when one is already active
setActive
is used when:
DeltaLog
is requested to execute an operation in a new transaction
clearActive¶
clearActive(): Unit
clearActive
clears the active transaction (so no transaction is associated with the current thread).
clearActive
is used when:
DeltaLog
is requested to execute an operation in a new transaction
getActive¶
getActive(): Option[OptimisticTransaction]
getActive returns the active transaction (if available).
getActive seems unused.
Logging¶
Enable ALL
logging level for org.apache.spark.sql.delta.OptimisticTransaction
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.delta.OptimisticTransaction=ALL
Refer to Logging.