DeltaLog

DeltaLog is a transaction log (aka change log) of changes to the state of a delta table.

DeltaLog uses _delta_log directory for (the files of) the transaction log of a delta table (that is given when DeltaLog.forTable utility is used to create an instance).

DeltaLog is created (indirectly via DeltaLog.apply utility) when DeltaLog.forTable utility is used.

import org.apache.spark.sql.SparkSession
assert(spark.isInstanceOf[SparkSession])

val dataPath = "/tmp/delta/t1"
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, dataPath)

import org.apache.hadoop.fs.Path
val expected = new Path(s"file:$dataPath/_delta_log/_last_checkpoint")
assert(deltaLog.LAST_CHECKPOINT == expected)

A common idiom (if not the only way) to know the current version of the delta table is to request the DeltaLog for the current state (snapshot) and then for the version.

import org.apache.spark.sql.delta.DeltaLog
assert(deltaLog.isInstanceOf[DeltaLog])

val deltaVersion = deltaLog.snapshot.version
scala> println(deltaVersion)
5

While being created, DeltaLog does the following:

In other words, the version of (the DeltaLog of) a delta table is at version 0 at the very minimum.

assert(deltaLog.snapshot.version >= 0)

DeltaLog is a LogStoreProvider.

Enable ALL logging level for org.apache.spark.sql.delta.DeltaLog logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.DeltaLog=ALL

Refer to Logging.

FileFormats

DeltaLog defines two FileFormats:

  • ParquetFileFormat for indices of delta files

  • JsonFileFormat for indices of checkpoint files

The FileFormats are used to create DeltaLogFileIndices for Snapshots that in turn used them for stateReconstruction.

_delta_log Directory

DeltaLog uses _delta_log metadata directory under the data path directory (that is specified using DeltaLog.forTable utility).

The _delta_log directory is resolved (in the DeltaLog.apply utility) using the application-wide Hadoop Configuration.

DeltaLog.apply utility uses the given SparkSession to create an Hadoop Configuration instance.

spark.sessionState.newHadoopConf()

Once resolved and turned into a qualified path, the _delta_log directory of the delta table (under the data path directory) is cached for later reuse.

LogStore

DeltaLog uses an LogStore for…​FIXME

Transaction Logs (DeltaLogs) per Fully-Qualified Path — deltaLogCache Internal Registry

deltaLogCache: Cache[Path, DeltaLog]
deltaLogCache is part of DeltaLog Scala object which makes it an application-wide cache. Once used, deltaLogCache will only be one until the application that uses it stops.

deltaLogCache is a registry of DeltaLogs by their fully-qualified _delta_log directories. A new instance of DeltaLog is added when DeltaLog.apply utility is used and the instance hasn’t been created before for a path.

deltaLogCache is invalidated:

Creating DeltaLog Instance

DeltaLog takes the following to be created:

  • Log directory (Hadoop Path)

  • Data directory (Hadoop Path)

  • Clock

DeltaLog initializes the internal properties.

Looking Up Or Creating DeltaLog Instance — apply Utility

apply(
  spark: SparkSession,
  rawPath: Path,
  clock: Clock = new SystemClock): DeltaLog
rawPath is a Hadoop Path to the _delta_log directory at the root of the data of a delta table.

apply…​FIXME

apply is used when DeltaLog is requested to forTable.

Executing Single-Threaded Operation (Block) in New Transaction — withNewTransaction Method

withNewTransaction[T](
  thunk: OptimisticTransaction => T): T

withNewTransaction starts a new transaction (that is active for the whole thread) and executes the given thunk block.

In the end, withNewTransaction makes the transaction no longer active.

withNewTransaction is used when:

Starting New Transaction — startTransaction Method

startTransaction(): OptimisticTransaction

startTransaction updates and creates a new OptimisticTransaction (for this DeltaLog).

startTransaction is a subset of withNewTransaction.
startTransaction is used when ConvertToDeltaCommand is executed.

Throwing UnsupportedOperationException For appendOnly Table Property Enabled — assertRemovable Method

assertRemovable(): Unit

assertRemovable throws an UnsupportedOperationException for the appendOnly table property (in the Metadata) enabled (true):

This table is configured to only allow appends. If you would like to permit updates or deletes, use 'ALTER TABLE <table_name> SET TBLPROPERTIES (appendOnly=false)'.
assertRemovable is used when…​FIXME

metadata Method

metadata: Metadata
metadata is part of the Checkpoints Contract to…​FIXME.

metadata requests the current Snapshot for the metadata or creates a new one (if the current Snapshot is not initialized).

Creating DeltaLog Instance — forTable Utility

forTable(
  spark: SparkSession,
  dataPath: File): DeltaLog
forTable(
  spark: SparkSession,
  dataPath: File,
  clock: Clock): DeltaLog
forTable(
  spark: SparkSession,
  dataPath: Path): DeltaLog
forTable(
  spark: SparkSession,
  dataPath: Path,
  clock: Clock): DeltaLog
forTable(
  spark: SparkSession,
  dataPath: String): DeltaLog
forTable(
  spark: SparkSession,
  dataPath: String,
  clock: Clock): DeltaLog

forTable creates a DeltaLog with _delta_log directory (in the given dataPath directory).

forTable is used when:

update Method

update(
  stalenessAcceptable: Boolean = false): Snapshot

update branches off based on a combination of flags: the given stalenessAcceptable and isSnapshotStale flags.

For the stalenessAcceptable not acceptable (default) and the snapshot not stale, update simply acquires the deltaLogLock lock and updateInternal (with isAsync flag off).

For all other cases, update…​FIXME

update is used when:

Current State Snapshot — snapshot Method

snapshot: Snapshot

snapshot returns the current snapshot.

snapshot is used when:

Current State Snapshot — currentSnapshot Internal Registry

currentSnapshot: Snapshot

currentSnapshot is a Snapshot based on the metadata checkpoint if available or a new Snapshot instance (with version being -1).

For a new Snapshot instance (with version being -1) DeltaLog immediately updates the state.

Internally, currentSnapshot…​FIXME

currentSnapshot is available using snapshot method.
currentSnapshot is used when DeltaLog is requested to updateInternal, update, tryUpdate, and <<isValid, isValid>.

Creating Insertable HadoopFsRelation For Batch Queries — createRelation Method

createRelation(
  partitionFilters: Seq[Expression] = Nil,
  timeTravel: Option[DeltaTimeTravelSpec] = None): BaseRelation

createRelation…​FIXME

createRelation creates a TahoeLogFileIndex for the data path, the given partitionFilters and a version (if defined).

createRelation…​FIXME

In the end, createRelation creates a HadoopFsRelation for the TahoeLogFileIndex and…​FIXME. The HadoopFsRelation is also an InsertableRelation.

createRelation is used when DeltaDataSource is requested for a relation as a CreatableRelationProvider and a RelationProvider (for batch queries).

insert Method

insert(
  data: DataFrame,
  overwrite: Boolean): Unit
insert is part of the InsertableRelation contract to…​FIXME.

insert…​FIXME

Retrieving State Of Delta Table At Given Version — getSnapshotAt Method

getSnapshotAt(
  version: Long,
  commitTimestamp: Option[Long] = None,
  lastCheckpointHint: Option[CheckpointInstance] = None): Snapshot

getSnapshotAt…​FIXME

getSnapshotAt is used when:

tryUpdate Method

tryUpdate(
  isAsync: Boolean = false): Snapshot

tryUpdate…​FIXME

tryUpdate is used exclusively when DeltaLog is requested to update.

ensureLogDirectoryExist Method

ensureLogDirectoryExist(): Unit

ensureLogDirectoryExist…​FIXME

ensureLogDirectoryExist is used when…​FIXME

protocolWrite Method

protocolWrite(
  protocol: Protocol,
  logUpgradeMessage: Boolean = true): Unit

protocolWrite…​FIXME

protocolWrite is used when…​FIXME

checkpointInterval Method

checkpointInterval: Int

checkpointInterval gives the value of checkpointInterval table property (from the Metadata).

checkpointInterval is used when…​FIXME

Changes (Actions) Of Delta Version And Later — getChanges Method

getChanges(
  startVersion: Long): Iterator[(Long, Seq[Action])]

getChanges gives all actions (changes) per delta log file for the given startVersion of a delta table and later.

val dataPath = "/tmp/delta/users"
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, dataPath)
assert(deltaLog.isInstanceOf[DeltaLog])
val changesPerVersion = deltaLog.getChanges(startVersion = 0)

Internally, getChanges requests the LogStore for files that are lexicographically greater or equal to the delta log file for the given startVersion (in the logPath) and leaves only delta log files (e.g. files with numbers only as file name and .json file extension).

For every delta file, getChanges requests the LogStore to read the JSON content (every line is an action), and then deserializes it to an action.

getChanges is used when DeltaSource is requested for the indexed file additions (FileAdd actions).

Creating DataFrame For Given AddFiles — createDataFrame Method

createDataFrame(
  snapshot: Snapshot,
  addFiles: Seq[AddFile],
  isStreaming: Boolean = false,
  actionTypeOpt: Option[String] = None): DataFrame

createDataFrame takes the action name to build the result DataFrame for from the actionTypeOpt (if defined), or uses the following per isStreaming flag:

  • streaming when isStreaming flag is enabled (true)

  • batch when isStreaming flag is disabled (false)

createDataFrame creates a new TahoeBatchFileIndex (for the action type, and the given AddFiles and Snapshot).

createDataFrame creates a HadoopFsRelation with the TahoeBatchFileIndex and the other properties based on the given Snapshot (and its Metadata).

Read up on HadoopFsRelation in The Internals of Spark SQL online book.

In the end, createDataFrame creates a DataFrame with a logical query plan with a LogicalRelation over the HadoopFsRelation.

Read up on LogicalRelation in The Internals of Spark SQL online book.

createDataFrame is used when:

Acquiring Interruptible Lock on Log — lockInterruptibly Method

lockInterruptibly[T](body: => T): T

lockInterruptibly…​FIXME

lockInterruptibly is used when…​FIXME

minFileRetentionTimestamp Method

minFileRetentionTimestamp: Long

minFileRetentionTimestamp is the timestamp that is tombstoneRetentionMillis before the current time (per the Clock).

minFileRetentionTimestamp is used when:

tombstoneRetentionMillis Method

tombstoneRetentionMillis: Long

tombstoneRetentionMillis gives the value of deletedFileRetentionDuration table property (from the Metadata).

tombstoneRetentionMillis is used when:

updateInternal Internal Method

updateInternal(
  isAsync: Boolean): Snapshot

updateInternal…​FIXME

updateInternal is used when DeltaLog is requested to update (directly or via tryUpdate).

Invalidating Cached DeltaLog Instance By Path — invalidateCache Utility

invalidateCache(
  spark: SparkSession,
  dataPath: Path): Unit

invalidateCache…​FIXME

invalidateCache is a public API and does not seem to be used at all.

Removing (Clearing) All Cached DeltaLog Instances — clearCache Utility

clearCache(): Unit

clearCache…​FIXME

clearCache is a public API and is used exclusively in tests.

upgradeProtocol Method

upgradeProtocol(
  newVersion: Protocol = Protocol()): Unit

upgradeProtocol…​FIXME

upgradeProtocol seems to be used exclusively in tests.

protocolRead Method

protocolRead(
  protocol: Protocol): Unit

protocolRead…​FIXME

protocolRead is used when:

isValid Method

isValid(): Boolean

isValid…​FIXME

isValid is used when DeltaLog utility is used to get or create a transaction log for a delta table.

Internal Properties

Name Description

deltaLogLock

Lock

Used when…​FIXME