Skip to content

OptimisticTransaction

OptimisticTransaction is an OptimisticTransactionImpl (which seems more of a class name change than anything more important).

When OptimisticTransaction (as a <>) is attempted to be <> (that does <> internally), the <> (of the <>) is requested to <>, e.g. _delta_log/00000000000000000001.json for the attempt version 1. Only when a FileAlreadyExistsException is thrown a commit is considered unsuccessful and <>.

OptimisticTransaction can be associated with a thread as an <>.

Creating Instance

OptimisticTransaction takes the following to be created:

  • [[deltaLog]] DeltaLog.md[]
  • [[snapshot]] Snapshot.md[]
  • [[clock]] Clock

NOTE: The <> and <> are part of the <> contract (which in turn inherits them as a TransactionalWrite.md[] and changes to val from def).

OptimisticTransaction is created for changes to a <> at a given <>.

OptimisticTransaction is created when DeltaLog is used for the following:

  • DeltaLog.md#startTransaction[Starting a new transaction]

  • DeltaLog.md#withNewTransaction[Executing a single-threaded operation (in a new transaction)] (for <>, <>, <>, and <> commands as well as for <> for <>)

== [[active]] Active Thread-Local OptimisticTransaction

[source, scala]

active: ThreadLocal[OptimisticTransaction]

active is a Java https://docs.oracle.com/javase/8/docs/api/java/lang/ThreadLocal.html[ThreadLocal] with the <> of the current thread.

ThreadLocal provides thread-local variables. These variables differ from their normal counterparts in that each thread that accesses one (via its get or set method) has its own, independently initialized copy of the variable.

ThreadLocal instances are typically private static fields in classes that wish to associate state with a thread (e.g., a user ID or Transaction ID).

active is assigned to the current thread using <> utility and cleared in <>.

active is available using <> utility.

There can only be one active OptimisticTransaction (or an IllegalStateException is thrown).

== [[utilities]] Utilities

=== [[setActive]] setActive

[source, scala]

setActive( txn: OptimisticTransaction): Unit


setActive simply associates the given OptimisticTransaction as <> with the current thread.

setActive throws an IllegalStateException if there is an active OptimisticTransaction already associated:

Cannot set a new txn as active when one is already active

setActive is used when DeltaLog is requested to <>.

=== [[clearActive]] clearActive

[source, scala]

clearActive(): Unit

clearActive simply clears the <> transaction (so no transaction is associated with a thread).

clearActive is used when DeltaLog is requested to <>.

=== [[getActive]] getActive

[source, scala]

getActive(): Option[OptimisticTransaction]

getActive simply returns the <> transaction.

getActive seems unused.

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.sql.delta.OptimisticTransaction logger to see what happens inside.

Add the following line to conf/log4j.properties:

[source,plaintext]

log4j.logger.org.apache.spark.sql.delta.OptimisticTransaction=ALL

Refer to Logging.

== [[demo]] Demo

[source,scala]

import org.apache.spark.sql.delta.DeltaLog val dir = "/tmp/delta/users" val log = DeltaLog.forTable(spark, dir)

val txn = log.startTransaction()

// ...changes to a delta table... val addFile = AddFile("foo", Map.empty, 1L, System.currentTimeMillis(), dataChange = true) val removeFile = addFile.remove val actions = addFile :: removeFile :: Nil

txn.commit(actions, op)

// You could do the following instead

deltaLog.withNewTransaction { txn => // ...transactional changes to a delta table }



Last update: 2020-09-30