Skip to content


DeltaLog is a transaction log (change log) of changes to the state of a Delta table (in the given data directory).

Creating Instance

DeltaLog takes the following to be created:

  • Log directory (Hadoop Path)
  • Data directory (Hadoop Path)
  • Clock

DeltaLog is created (indirectly via DeltaLog.apply utility) when:

_delta_log Metadata Directory

DeltaLog uses _delta_log metadata directory for the transaction log of a Delta table.

The _delta_log directory is in the given data path directory (when created using DeltaLog.forTable utility).

The _delta_log directory is resolved (in the DeltaLog.apply utility) using the application-wide Hadoop Configuration.

Once resolved and turned into a qualified path, the _delta_log directory is cached.

DeltaLog.forTable Utility

  spark: SparkSession,
  table: CatalogTable): DeltaLog
  spark: SparkSession,
  table: CatalogTable,
  clock: Clock): DeltaLog
  spark: SparkSession,
  deltaTable: DeltaTableIdentifier): DeltaLog
  spark: SparkSession,
  dataPath: File): DeltaLog
  spark: SparkSession,
  dataPath: File,
  clock: Clock): DeltaLog
  spark: SparkSession,
  dataPath: Path): DeltaLog
  spark: SparkSession,
  dataPath: Path,
  clock: Clock): DeltaLog
  spark: SparkSession,
  dataPath: String): DeltaLog
  spark: SparkSession,
  dataPath: String,
  clock: Clock): DeltaLog
  spark: SparkSession,
  tableName: TableIdentifier): DeltaLog
  spark: SparkSession,
  tableName: TableIdentifier,
  clock: Clock): DeltaLog

forTable creates a DeltaLog with _delta_log directory (in the given dataPath directory).

forTable is used when:

Looking Up Or Creating DeltaLog Instance

  spark: SparkSession,
  rawPath: Path,
  clock: Clock = new SystemClock): DeltaLog


rawPath is a Hadoop Path to the _delta_log directory at the root of the data of a delta table.



tableExists: Boolean

tableExists requests the current Snapshot for the version and checks out whether it is 0 or higher.

is used when:

Demo: Creating DeltaLog

import org.apache.spark.sql.SparkSession

val dataPath = "/tmp/delta/t1"
val deltaLog = DeltaLog.forTable(spark, dataPath)

import org.apache.hadoop.fs.Path
val expected = new Path(s"file:$dataPath/_delta_log/_last_checkpoint")
assert(deltaLog.LAST_CHECKPOINT == expected)

Accessing Current Version

A common idiom (if not the only way) to know the current version of the delta table is to request the DeltaLog for the current state (snapshot) and then for the version.


val deltaVersion = deltaLog.snapshot.version
scala> println(deltaVersion)


When created, DeltaLog does the following:

  1. Creates the LogStore based on configuration property

  2. Initializes the current snapshot

  3. Updates state of the delta table when there is no metadata checkpoint (e.g. the version of the state is -1)

In other words, the version of (the DeltaLog of) a delta table is at version 0 at the very minimum.

assert(deltaLog.snapshot.version >= 0)

filterFileList Utility

  partitionSchema: StructType,
  files: DataFrame,
  partitionFilters: Seq[Expression],
  partitionColumnPrefixes: Seq[String] = Nil): DataFrame


filterFileList is used when:


DeltaLog defines two FileFormats (Spark SQL):

  • ParquetFileFormat for indices of delta files

  • JsonFileFormat for indices of checkpoint files

These FileFormats are used to create DeltaLogFileIndexes for Snapshots that in turn used them for stateReconstruction.


DeltaLog uses a LogStore for...FIXME

Transaction Logs (DeltaLogs) per Fully-Qualified Path

deltaLogCache: Cache[Path, DeltaLog]

deltaLogCache is part of DeltaLog Scala object which makes it an application-wide cache "for free". Once used, deltaLogCache will only be one until the application that uses it stops.

deltaLogCache is a registry of DeltaLogs by their fully-qualified _delta_log directories. A new instance of DeltaLog is added when DeltaLog.apply utility is used and the instance hasn't been created before for a path.

deltaLogCache is invalidated:

Executing Single-Threaded Operation in New Transaction

  thunk: OptimisticTransaction => T): T

withNewTransaction starts a new transaction (that is active for the whole thread) and executes the given thunk block.

In the end, withNewTransaction makes the transaction no longer active.

withNewTransaction is used when:

Starting New Transaction

startTransaction(): OptimisticTransaction

startTransaction updates and creates a new OptimisticTransaction (for this DeltaLog).


startTransaction is a "subset" of withNewTransaction.

startTransaction is used when:

Throwing UnsupportedOperationException for Append-Only Tables

assertRemovable(): Unit

assertRemovable throws an UnsupportedOperationException for the appendOnly table property (in the Metadata) enabled (true):

This table is configured to only allow appends. If you would like to permit updates or deletes, use 'ALTER TABLE <table_name> SET TBLPROPERTIES (appendOnly=false)'.

assertRemovable is used when:


metadata: Metadata

metadata is part of the Checkpoints abstraction.

metadata requests the current Snapshot for the metadata or creates a new one (if the current Snapshot is not initialized).


  stalenessAcceptable: Boolean = false): Snapshot

update branches off based on a combination of flags: the given stalenessAcceptable and isSnapshotStale flags.

For the stalenessAcceptable not acceptable (default) and the snapshot not stale, update simply acquires the deltaLogLock lock and updateInternal (with isAsync flag off).

For all other cases, update...FIXME

update is used when:


  isAsync: Boolean = false): Snapshot


Current State Snapshot

snapshot: Snapshot

snapshot returns the current snapshot.

snapshot is used when:

Current State Snapshot

currentSnapshot: Snapshot

currentSnapshot is a Snapshot based on the metadata checkpoint if available or a new Snapshot instance (with version being -1).


For a new Snapshot instance (with version being -1) DeltaLog immediately updates the state.

Internally, currentSnapshot...FIXME

currentSnapshot is available using snapshot method.

currentSnapshot is used when:

Creating Insertable HadoopFsRelation For Batch Queries

  partitionFilters: Seq[Expression] = Nil,
  snapshotToUseOpt: Option[Snapshot] = None,
  isTimeTravelQuery: Boolean = false,
  cdcOptions: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty): BaseRelation


createRelation creates a TahoeLogFileIndex for the data path, the given partitionFilters and a version (if defined).


In the end, createRelation creates a HadoopFsRelation for the TahoeLogFileIndex and...FIXME. The HadoopFsRelation is also an InsertableRelation.

createRelation is used when:


  data: DataFrame,
  overwrite: Boolean): Unit


insert is part of the InsertableRelation (Spark SQL) abstraction.

Retrieving State Of Delta Table At Given Version

  version: Long,
  commitTimestamp: Option[Long] = None,
  lastCheckpointHint: Option[CheckpointInstance] = None): Snapshot


getSnapshotAt is used when:


checkpointInterval: Int

checkpointInterval gives the value of checkpointInterval table property (from the Metadata).

checkpointInterval is used when...FIXME

Changes (Actions) Of Delta Version And Later

  startVersion: Long): Iterator[(Long, Seq[Action])]

getChanges gives all actions (changes) per delta log file for the given startVersion of a delta table and later.

val dataPath = "/tmp/delta/users"
val deltaLog = DeltaLog.forTable(spark, dataPath)
val changesPerVersion = deltaLog.getChanges(startVersion = 0)

Internally, getChanges requests the LogStore for files that are lexicographically greater or equal to the delta log file for the given startVersion (in the logPath) and leaves only delta log files (e.g. files with numbers only as file name and .json file extension).

For every delta file, getChanges requests the LogStore to read the JSON content (every line is an action), and then deserializes it to an action.

getChanges is used when:

Creating DataFrame For Given AddFiles

  snapshot: Snapshot,
  addFiles: Seq[AddFile],
  isStreaming: Boolean = false,
  actionTypeOpt: Option[String] = None): DataFrame

createDataFrame uses the action type based on the optional action type (if defined) or uses the following based on the isStreaming flag:

  • streaming when isStreaming flag is enabled (true)
  • batch when isStreaming flag is disabled (false)


actionTypeOpt seems not to be defined ever.

createDataFrame creates a new TahoeBatchFileIndex (for the action type, and the given AddFiles and Snapshot).

createDataFrame creates a HadoopFsRelation (Spark SQL) with the TahoeBatchFileIndex and the other properties based on the given Snapshot (and the associated Metadata).

In the end, createDataFrame creates a DataFrame with a logical query plan with a LogicalRelation (Spark SQL) over the HadoopFsRelation.

createDataFrame is used when:

minFileRetentionTimestamp Method

minFileRetentionTimestamp: Long

minFileRetentionTimestamp is the timestamp that is tombstoneRetentionMillis before the current time (per the given Clock).

minFileRetentionTimestamp is used when:

tombstoneRetentionMillis Method

tombstoneRetentionMillis: Long

tombstoneRetentionMillis gives the value of deletedFileRetentionDuration table property (from the Metadata).

tombstoneRetentionMillis is used when:

updateInternal Internal Method

  isAsync: Boolean): Snapshot


updateInternal is used when:

Invalidating Cached DeltaLog Instance By Path

  spark: SparkSession,
  dataPath: Path): Unit


invalidateCache is a public API and does not seem to be used at all.


  protocol: Protocol): Unit


protocolRead is used when:


  newVersion: Protocol = Protocol()): Unit


upgradeProtocol is used when:


DeltaLog is a LogStoreProvider.


Enable ALL logging level for logger to see what happens inside.

Add the following line to conf/

Refer to Logging.

Last update: 2021-03-26