Skip to content

DeltaLog

DeltaLog is a transaction log (change log) of changes to the state of a delta table.

DeltaLog uses _delta_log directory for (the files of) the transaction log of a delta table (that is given when DeltaLog.forTable utility is used to create an instance).

DeltaLog is created (indirectly via DeltaLog.apply utility) when DeltaLog.forTable utility is used.

import org.apache.spark.sql.SparkSession
assert(spark.isInstanceOf[SparkSession])

val dataPath = "/tmp/delta/t1"
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, dataPath)

import org.apache.hadoop.fs.Path
val expected = new Path(s"file:$dataPath/_delta_log/_last_checkpoint")
assert(deltaLog.LAST_CHECKPOINT == expected)

A common idiom (if not the only way) to know the current version of the delta table is to request the DeltaLog for the current state (snapshot) and then for the version.

import org.apache.spark.sql.delta.DeltaLog
assert(deltaLog.isInstanceOf[DeltaLog])

val deltaVersion = deltaLog.snapshot.version
scala> println(deltaVersion)
5

When created, DeltaLog does the following:

  • Creates the <> based on <> configuration property (default: <>)

  • Initializes the <>

  • <> when there is no <> (e.g. the version of the <> is -1)

In other words, the version of (the DeltaLog of) a delta table is at version 0 at the very minimum.

assert(deltaLog.snapshot.version >= 0)

DeltaLog is a LogStoreProvider.

filterFileList Utility

filterFileList(
  partitionSchema: StructType,
  files: DataFrame,
  partitionFilters: Seq[Expression],
  partitionColumnPrefixes: Seq[String] = Nil): DataFrame

filterFileList...FIXME

filterFileList is used when:

Creating DeltaLog Instance

forTable(
  spark: SparkSession,
  dataPath: File): DeltaLog
forTable(
  spark: SparkSession,
  dataPath: File,
  clock: Clock): DeltaLog
forTable(
  spark: SparkSession,
  dataPath: Path): DeltaLog
forTable(
  spark: SparkSession,
  dataPath: Path,
  clock: Clock): DeltaLog
forTable(
  spark: SparkSession,
  dataPath: String): DeltaLog
forTable(
  spark: SparkSession,
  dataPath: String,
  clock: Clock): DeltaLog

forTable creates a DeltaLog with _delta_log directory (in the given dataPath directory).

forTable is used when:

  • <> utility is used to create a <>

  • <>, <>, <> are requested to run

  • DeltaDataSource is requested to <>, <>, and create a relation (as <> and <>)

  • <> utility is used

  • DeltaTableIdentifier is requested to getDeltaLog

  • <> is created

== [[FileFormats]] FileFormats

DeltaLog defines two FileFormats:

  • [[CHECKPOINT_FILE_FORMAT]] ParquetFileFormat for indices of delta files

  • [[COMMIT_FILE_FORMAT]] JsonFileFormat for indices of checkpoint files

The FileFormats are used to create <> for <> that in turn used them for <>.

== [[_delta_log]] _delta_log Directory

DeltaLog uses _delta_log metadata directory under the <> directory (that is specified using <> utility).

The _delta_log directory is resolved (in the <> utility) using the application-wide Hadoop https://hadoop.apache.org/docs/current2/api/org/apache/hadoop/conf/Configuration.html[Configuration].

[NOTE]

<> utility uses the given SparkSession to create an Hadoop Configuration instance.

[source, scala]

spark.sessionState.newHadoopConf()

====

Once resolved and turned into a qualified path, the _delta_log directory of the delta table (under the <> directory) is <> for later reuse.

== [[store]] LogStore

DeltaLog uses an <> for...FIXME

== [[deltaLogCache]] Transaction Logs (DeltaLogs) per Fully-Qualified Path -- deltaLogCache Internal Registry

[source, scala]

deltaLogCache: Cache[Path, DeltaLog]

NOTE: deltaLogCache is part of DeltaLog Scala object which makes it an application-wide cache. Once used, deltaLogCache will only be one until the application that uses it stops.

deltaLogCache is a registry of DeltaLogs by their fully-qualified <<_delta_log, _delta_log>> directories. A new instance of DeltaLog is added when <> utility is used and the instance hasn't been created before for a path.

deltaLogCache is invalidated:

  • For a delta table using <> utility (and <> when the cached reference is no longer <>)

  • For all delta tables using <> utility

== [[creating-instance]] Creating DeltaLog Instance

DeltaLog takes the following to be created:

  • [[logPath]] Log directory (Hadoop Path)
  • [[dataPath]] Data directory (Hadoop Path)
  • [[clock]] Clock

DeltaLog initializes the <>.

== [[apply]] Looking Up Or Creating DeltaLog Instance -- apply Utility

[source, scala]

apply( spark: SparkSession, rawPath: Path, clock: Clock = new SystemClock): DeltaLog


NOTE: rawPath is a Hadoop Path to the <<_delta_log, _delta_log>> directory at the root of the data of a delta table.

apply...FIXME

NOTE: apply is used when DeltaLog is requested to <>.

Executing Single-Threaded Operation in New Transaction

withNewTransaction[T](
  thunk: OptimisticTransaction => T): T

withNewTransaction starts a new transaction (that is active for the whole thread) and executes the given thunk block.

In the end, withNewTransaction makes the transaction no longer active.

withNewTransaction is used when:

Starting New Transaction

startTransaction(): OptimisticTransaction

startTransaction <> and creates a new OptimisticTransaction.md[] (for this DeltaLog).

NOTE: startTransaction is a subset of <>.

startTransaction is used when:

  • DeltaLog is requested to <>

  • AlterDeltaTableCommand is requested to AlterDeltaTableCommand.md#startTransaction[startTransaction]

  • ConvertToDeltaCommandBase is ConvertToDeltaCommand.md#run[executed]

  • CreateDeltaTableCommand is CreateDeltaTableCommand.md#run[executed]

== [[assertRemovable]] Throwing UnsupportedOperationException For appendOnly Table Property Enabled -- assertRemovable Method

[source, scala]

assertRemovable(): Unit

assertRemovable throws an UnsupportedOperationException for the <> table property (<> the <>) enabled (true):

This table is configured to only allow appends. If you would like to permit updates or deletes, use 'ALTER TABLE <table_name> SET TBLPROPERTIES (appendOnly=false)'.

NOTE: assertRemovable is used when...FIXME

== [[metadata]] metadata Method

[source, scala]

metadata: Metadata

NOTE: metadata is part of the <> to...FIXME.

metadata requests the <> for the <> or creates a new <> (if the <> is not initialized).

== [[update]] update Method

[source, scala]

update( stalenessAcceptable: Boolean = false): Snapshot


update branches off based on a combination of flags: the given stalenessAcceptable and <> flags.

For the stalenessAcceptable not acceptable (default) and the <>, update simply acquires the <> lock and <> (with isAsync flag off).

For all other cases, update...FIXME

[NOTE]

update is used when:

  • DeltaHistoryManager is requested to <>, <>, and <>

  • DeltaLog is <> (with no <> created), and requested to <> and <>

  • OptimisticTransactionImpl is requested to <> and <>

  • ConvertToDeltaCommand is requested to <> and <>

  • VacuumCommand utility is used to <>

  • TahoeLogFileIndex is requested for the <>

* DeltaDataSource is requested for a <>

== [[snapshot]] Current State Snapshot -- snapshot Method

[source, scala]

snapshot: Snapshot

snapshot returns the <>.

[NOTE]

snapshot is used when:

  • <> is created

  • Checkpoints is requested to <>

  • DeltaLog is requested for the <>, to <>, <>, <>

  • OptimisticTransactionImpl is requested to <>

  • <>, <>, <>, <>, <> are executed

  • DeltaCommand is requested to <>

  • TahoeFileIndex is requested for the <>, <>

  • TahoeLogFileIndex is requested for the <>

  • DeltaDataSource is requested for the <>

* <> is created and requested for the <>, <>

== [[currentSnapshot]] Current State Snapshot -- currentSnapshot Internal Registry

[source, scala]

currentSnapshot: Snapshot

currentSnapshot is a <> based on the <> if available or a new Snapshot instance (with version being -1).

NOTE: For a new Snapshot instance (with version being -1) DeltaLog immediately <>.

Internally, currentSnapshot...FIXME

NOTE: currentSnapshot is available using <> method.

NOTE: currentSnapshot is used when DeltaLog is requested to <>, <>, <>, and <.

Creating Insertable HadoopFsRelation For Batch Queries

createRelation(
  partitionFilters: Seq[Expression] = Nil,
  timeTravel: Option[DeltaTimeTravelSpec] = None): BaseRelation

createRelation...FIXME

createRelation creates a TahoeLogFileIndex for the data path, the given partitionFilters and a version (if defined).

createRelation...FIXME

In the end, createRelation creates a HadoopFsRelation for the TahoeLogFileIndex and...FIXME. The HadoopFsRelation is also an <>.

createRelation is used when DeltaDataSource is requested for a relation as a CreatableRelationProvider and a RelationProvider (for batch queries).

insert Method

insert(
  data: DataFrame,
  overwrite: Boolean): Unit

insert...FIXME

insert is part of the InsertableRelation abstraction.

== [[getSnapshotAt]] Retrieving State Of Delta Table At Given Version -- getSnapshotAt Method

[source, scala]

getSnapshotAt( version: Long, commitTimestamp: Option[Long] = None, lastCheckpointHint: Option[CheckpointInstance] = None): Snapshot


getSnapshotAt...FIXME

[NOTE]

getSnapshotAt is used when:

  • DeltaLog is requested for a <>, and to <>

  • DeltaSource is requested to <>

* TahoeLogFileIndex is requested for <>

== [[tryUpdate]] tryUpdate Method

[source, scala]

tryUpdate( isAsync: Boolean = false): Snapshot


tryUpdate...FIXME

NOTE: tryUpdate is used exclusively when DeltaLog is requested to <>.

== [[ensureLogDirectoryExist]] ensureLogDirectoryExist Method

[source, scala]

ensureLogDirectoryExist(): Unit

ensureLogDirectoryExist...FIXME

NOTE: ensureLogDirectoryExist is used when...FIXME

== [[protocolWrite]] protocolWrite Method

[source, scala]

protocolWrite( protocol: Protocol, logUpgradeMessage: Boolean = true): Unit


protocolWrite...FIXME

NOTE: protocolWrite is used when...FIXME

== [[checkpointInterval]] checkpointInterval Method

[source, scala]

checkpointInterval: Int

checkpointInterval gives the value of <> table property (<> the <>).

NOTE: checkpointInterval is used when...FIXME

== [[getChanges]] Changes (Actions) Of Delta Version And Later -- getChanges Method

[source, scala]

getChanges( startVersion: Long): Iterator[(Long, Seq[Action])]


getChanges gives all <> (changes) per delta log file for the given startVersion of a delta table and later.

[source,scala]

val dataPath = "/tmp/delta/users" import org.apache.spark.sql.delta.DeltaLog val deltaLog = DeltaLog.forTable(spark, dataPath) assert(deltaLog.isInstanceOf[DeltaLog]) val changesPerVersion = deltaLog.getChanges(startVersion = 0)


Internally, getChanges requests the <> for <> that are lexicographically greater or equal to the <> for the given startVersion (in the <>) and leaves only <> (e.g. files with numbers only as file name and .json file extension).

For every delta file, getChanges requests the <> to <> (every line is an <>), and then <>.

NOTE: getChanges is used when DeltaSource is requested for the <>.

Creating DataFrame For Given AddFiles

createDataFrame(
  snapshot: Snapshot,
  addFiles: Seq[AddFile],
  isStreaming: Boolean = false,
  actionTypeOpt: Option[String] = None): DataFrame

createDataFrame uses the action type based on the optional action type (if defined) or uses the following based on the isStreaming flag:

  • streaming when isStreaming flag is enabled (true)
  • batch when isStreaming flag is disabled (false)

Note

actionTypeOpt seems not to be defined ever.

createDataFrame creates a new TahoeBatchFileIndex (for the action type, and the given AddFiles and Snapshot).

createDataFrame creates a HadoopFsRelation with the TahoeBatchFileIndex and the other properties based on the given Snapshot (and the associated Metadata).

Tip

Learn more on HadoopFsRelation in The Internals of Spark SQL online book.

In the end, createDataFrame creates a DataFrame with a logical query plan with a LogicalRelation over the HadoopFsRelation.

Tip

Learn more on LogicalRelation in The Internals of Spark SQL online book.

createDataFrame is used when:

== [[lockInterruptibly]] Acquiring Interruptible Lock on Log -- lockInterruptibly Method

[source, scala]

lockInterruptiblyT: T

lockInterruptibly...FIXME

NOTE: lockInterruptibly is used when...FIXME

== [[minFileRetentionTimestamp]] minFileRetentionTimestamp Method

[source, scala]

minFileRetentionTimestamp: Long

minFileRetentionTimestamp is the timestamp that is <> before the current time (per the <>).

[NOTE]

minFileRetentionTimestamp is used when:

  • DeltaLog is requested for the <>, to <>, and to <>

* VacuumCommand is requested for <>

== [[tombstoneRetentionMillis]] tombstoneRetentionMillis Method

[source, scala]

tombstoneRetentionMillis: Long

tombstoneRetentionMillis gives the value of <> table property (<> the <>).

[NOTE]

tombstoneRetentionMillis is used when:

  • DeltaLog is requested for <>

* VacuumCommand is requested for <>

== [[updateInternal]] updateInternal Internal Method

[source, scala]

updateInternal( isAsync: Boolean): Snapshot


updateInternal...FIXME

NOTE: updateInternal is used when DeltaLog is requested to <> (directly or via <>).

== [[invalidateCache]] Invalidating Cached DeltaLog Instance By Path -- invalidateCache Utility

[source, scala]

invalidateCache( spark: SparkSession, dataPath: Path): Unit


invalidateCache...FIXME

NOTE: invalidateCache is a public API and does not seem to be used at all.

== [[clearCache]] Removing (Clearing) All Cached DeltaLog Instances -- clearCache Utility

[source, scala]

clearCache(): Unit

clearCache...FIXME

NOTE: clearCache is a public API and is used exclusively in tests.

== [[upgradeProtocol]] upgradeProtocol Method

[source, scala]

upgradeProtocol( newVersion: Protocol = Protocol()): Unit


upgradeProtocol...FIXME

NOTE: upgradeProtocol seems to be used exclusively in tests.

== [[protocolRead]] protocolRead Method

[source, scala]

protocolRead( protocol: Protocol): Unit


protocolRead...FIXME

[NOTE]

protocolRead is used when:

  • OptimisticTransactionImpl is requested to <>

  • <> is created

* DeltaSource is requested to <>

== [[isValid]] isValid Method

[source, scala]

isValid(): Boolean

isValid...FIXME

NOTE: isValid is used when DeltaLog utility is used to <>.

Logging

Enable ALL logging level for org.apache.spark.sql.delta.DeltaLog logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.DeltaLog=ALL

Refer to Logging.


Last update: 2020-10-13