DeltaLog¶
DeltaLog is a transaction log (change log) of all the changes to (the state of) a delta table.
Creating Instance¶
DeltaLog takes the following to be created:
DeltaLog is created (indirectly via DeltaLog.apply utility) when:
- DeltaLog.forTable utility is used
 
DeltaHistoryManager¶
history: DeltaHistoryManager
DeltaLog creates a DeltaHistoryManager (when requested for one the very first time).
Lazy Value
history is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
DeltaLog uses spark.databricks.delta.history.maxKeysPerList configuration property for the maxKeysPerList.
_delta_log Metadata Directory¶
DeltaLog uses _delta_log metadata directory for the transaction log of a Delta table.
The _delta_log directory is in the given data path directory (when created using DeltaLog.forTable utility).
The _delta_log directory is resolved (in the DeltaLog.apply utility) using the application-wide Hadoop Configuration.
Once resolved and turned into a qualified path, the _delta_log directory is cached.
DeltaLog Cache¶
deltaLogCache: Cache[(Path, Map[String, String]), DeltaLog]
DeltaLog uses Guava's Cache as a cache of DeltaLogs by their HDFS-qualified _delta_log directories (with theirfs.-prefixed file system options).
deltaLogCache is part of DeltaLog Scala object and so becomes an application-wide cache by design (an object in Scala is available as a single instance).
Caching DeltaLog Instance¶
A new instance of DeltaLog is added when DeltaLog.apply utility is used and the instance is not available for a path (and file system options).
Cache Size¶
The size of the cache is controlled by delta.log.cacheSize system property.
DeltaLog Instance Expiration¶
DeltaLogs expire and are automatically removed from the deltaLogCache after 60 minutes (non-configurable) of inactivity. Upon expiration, deltaLogCache requests the Snapshot of the DeltaLog to uncache.
Cache Clearance¶
deltaLogCache is invalidated:
-  
For a delta table using DeltaLog.invalidateCache utility
 -  
For all delta tables using DeltaLog.clearCache utility
 
DeltaLog.forTable¶
// There are many forTable's
forTable(...): DeltaLog
forTable is an utility that creates a DeltaLog with _delta_log directory (in the given dataPath directory).
Demo: Creating DeltaLog¶
import org.apache.spark.sql.SparkSession
assert(spark.isInstanceOf[SparkSession])
val dataPath = "/tmp/delta/t1"
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, dataPath)
import org.apache.hadoop.fs.Path
val expected = new Path(s"file:$dataPath/_delta_log/_last_checkpoint")
assert(deltaLog.LAST_CHECKPOINT == expected)
tableExists¶
tableExists: Boolean
tableExists requests the current Snapshot for the version and checks out whether it is 0 or higher.
tableExists is used when:
DeltaTableutility is used to isDeltaTable- DeltaUnsupportedOperationsCheck logical check rule is executed
 DeltaTableV2is requested to toBaseRelation
Accessing Current Version¶
A common idiom (if not the only way) to know the current version of the delta table is to request the DeltaLog for the current state (snapshot) and then for the version.
import org.apache.spark.sql.delta.DeltaLog
assert(deltaLog.isInstanceOf[DeltaLog])
val deltaVersion = deltaLog.snapshot.version
scala> println(deltaVersion)
5
Initialization¶
When created, DeltaLog does the following:
-  
Creates the LogStore based on spark.delta.logStore.class configuration property
 -  
Initializes the current snapshot
 -  
Updates state of the delta table when there is no metadata checkpoint (e.g. the version of the state is
-1) 
In other words, the version of (the DeltaLog of) a delta table is at version 0 at the very minimum.
assert(deltaLog.snapshot.version >= 0)
filterFileList¶
filterFileList(
  partitionSchema: StructType,
  files: DataFrame,
  partitionFilters: Seq[Expression],
  partitionColumnPrefixes: Seq[String] = Nil): DataFrame
filterFileList...FIXME
filterFileList is used when:
OptimisticTransactionImplis requested to checkAndRetryPartitionFilteringis requested to filesForScanWriteIntoDeltais requested to writeSnapshotIteratoris requested to iteratorTahoeBatchFileIndexis requested to matchingFilesDeltaDataSourceutility is requested to verifyAndCreatePartitionFilters
FileFormats¶
DeltaLog defines two FileFormats (Spark SQL):
-  
ParquetFileFormatfor indices of delta files -  
JsonFileFormatfor indices of checkpoint files 
These FileFormats are used to create DeltaLogFileIndexes for Snapshots that in turn used them for stateReconstruction.
LogStore¶
DeltaLog uses a LogStore for...FIXME
Executing Single-Threaded Operation in New Transaction¶
withNewTransaction[T](
  thunk: OptimisticTransaction => T): T
withNewTransaction starts a new transaction (that is active for the whole thread) and executes the given thunk block.
In the end, withNewTransaction makes the transaction no longer active.
withNewTransaction is used when:
- DeleteCommand, MergeIntoCommand, RestoreTableCommand, UpdateCommand, and WriteIntoDelta commands are executed
 DeltaSinkis requested to add a streaming micro-batch
Starting New Transaction¶
startTransaction(): OptimisticTransaction
startTransaction updates and creates a new OptimisticTransaction (for this DeltaLog).
Note
startTransaction is a "subset" of withNewTransaction.
startTransaction is used when:
-  
DeltaLogis requested to upgradeProtocol -  
AlterDeltaTableCommandis requested to startTransaction -  
ConvertToDeltaCommand and CreateDeltaTableCommand are executed
 
metadata¶
metadata: Metadata
metadata is part of the Checkpoints abstraction.
metadata requests the current Snapshot for the metadata or creates a new one (if the current Snapshot is not initialized).
update¶
update(
  stalenessAcceptable: Boolean = false): Snapshot
update branches off based on a combination of flags: the given stalenessAcceptable and isSnapshotStale.
For the stalenessAcceptable not acceptable (default) and the snapshot not stale, update simply acquires the deltaLogLock lock and updateInternal (with isAsync flag off).
For all other cases, update...FIXME
update is used when:
-  
DeltaHistoryManageris requested to getHistory, getActiveCommitAtTime, and checkVersionExists -  
DeltaLogis created (with no checkpoint created), and requested to startTransaction and withNewTransaction -  
OptimisticTransactionImplis requested to doCommit and checkAndRetry -  
ConvertToDeltaCommandis requested to run and streamWrite -  
VacuumCommandutility is used to gc -  
TahoeLogFileIndexis requested for the (historical or latest) snapshot -  
DeltaDataSourceis requested for a relation 
tryUpdate¶
tryUpdate(
  isAsync: Boolean = false): Snapshot
tryUpdate...FIXME
Snapshot¶
snapshot: Snapshot
snapshot returns the current snapshot.
snapshot is used when:
-  
OptimisticTransaction is created
 -  
Checkpointsis requested to checkpoint -  
DeltaLogis requested for the metadata, to upgradeProtocol, getSnapshotAt, createRelation -  
OptimisticTransactionImplis requested to getNextAttemptVersion -  
DeleteCommand, DeltaGenerateCommand, DescribeDeltaDetailCommand, UpdateCommand commands are executed
 -  
GenerateSymlinkManifest is executed
 -  
DeltaCommandis requested to buildBaseRelation -  
TahoeFileIndexis requested for the table version, partitionSchema -  
TahoeLogFileIndexis requested for the table size -  
DeltaDataSourceis requested for the schema of the streaming delta source -  
DeltaSource is created and requested for the getStartingOffset, getBatch
 
Current State Snapshot¶
currentSnapshot: Snapshot
currentSnapshot is a Snapshot based on the metadata checkpoint if available or a new Snapshot instance (with version being -1).
Note
For a new Snapshot instance (with version being -1) DeltaLog immediately updates the state.
Internally, currentSnapshot...FIXME
currentSnapshot is available using snapshot method.
currentSnapshot is used when:
DeltaLogis requested to updateInternal, update and tryUpdate
Creating Insertable HadoopFsRelation For Batch Queries¶
createRelation(
  partitionFilters: Seq[Expression] = Nil,
  snapshotToUseOpt: Option[Snapshot] = None,
  isTimeTravelQuery: Boolean = false,
  cdcOptions: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty): BaseRelation
createRelation...FIXME
With non-empty cdcOptions, createRelation creates a CDC-aware relation (that represents data change between two snapshots of the table).
When cdcOptions is non-empty
cdcOptions is empty by default and can only be specified when DeltaTableV2 is created.
createRelation creates a TahoeLogFileIndex for the data path, the given partitionFilters and a version (if defined).
createRelation...FIXME
In the end, createRelation creates a HadoopFsRelation for the TahoeLogFileIndex and...FIXME.
Note
The returned HadoopFsRelation is also an InsertableRelation.
createRelation is used when:
DeltaTableV2is requested to toBaseRelationWriteIntoDeltaBuilderis requested to buildForV1WriteWriteIntoDeltais requested to removeFilesDeltaDataSourceis requested for a writable relation
insert¶
insert(
  data: DataFrame,
  overwrite: Boolean): Unit
insert...FIXME
insert is part of the InsertableRelation (Spark SQL) abstraction.
Retrieving State Of Delta Table At Given Version¶
getSnapshotAt(
  version: Long,
  commitTimestamp: Option[Long] = None,
  lastCheckpointHint: Option[CheckpointInstance] = None): Snapshot
getSnapshotAt...FIXME
getSnapshotAt is used when:
-  
DeltaLogis requested for a relation, and to updateInternal -  
DeltaSourceis requested for the snapshot of a delta table at a given version -  
TahoeLogFileIndexis requested for historicalSnapshotOpt 
Checkpoint Interval¶
checkpointInterval: Int
checkpointInterval is the current value of checkpointInterval table property (from the Metadata).
checkpointInterval is used when:
OptimisticTransactionImplis requested to postCommit
Changes (Actions) Of Delta Version And Later¶
getChanges(
  startVersion: Long): Iterator[(Long, Seq[Action])]
getChanges gives all the Actions (changes) per delta log file for the given startVersion (inclusive) of a delta table and later.
val dataPath = "/tmp/delta/users"
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, dataPath)
assert(deltaLog.isInstanceOf[DeltaLog])
val changesPerVersion = deltaLog.getChanges(startVersion = 0)
Internally, getChanges requests the LogStore for files that are lexicographically greater or equal to the delta log file for the given startVersion (in the logPath) and leaves only delta log files (e.g. files with numbers only as file name and .json file extension).
For every delta file, getChanges requests the LogStore to read the JSON content (every line is an action), and then deserializes it to an action.
Creating DataFrame (From AddFiles)¶
createDataFrame(
  snapshot: SnapshotDescriptor,
  addFiles: Seq[AddFile],
  isStreaming: Boolean = false,
  actionTypeOpt: Option[String] = None): DataFrame
createDataFrame uses the action type based on the optional action type (if defined) or uses the following based on the isStreaming flag:
- streaming when 
isStreamingflag is enabled (true) - batch when 
isStreamingflag is disabled (false) 
createDataFrame creates a new TahoeBatchFileIndex (for the action type, and the given AddFiles, this DeltaLog, and Snapshot).
createDataFrame creates a HadoopFsRelation (Spark SQL) with the TahoeBatchFileIndex and the other properties based on the given Snapshot (and the associated Metadata).
In the end, createDataFrame creates a DataFrame with (a logical query plan with) a LogicalRelation (Spark SQL) over the HadoopFsRelation.
createDataFrame is used when:
- AlterTableAddConstraintDeltaCommand is executed
 - MergeIntoCommand is executed (and requested to buildTargetPlanWithFiles)
 OptimizeExecutoris requested to runOptimizeBinJobDeltaSourceBaseis requested to createDataFrameStatisticsCollectionutility is used to recompute
Demo: DeltaLog.createDataFrame¶
Create a delta table with some data to work with. We need data files for this demo.
sql("DROP TABLE IF EXISTS delta_demo")
spark.range(5).write.format("delta").saveAsTable("delta_demo")
Review the data (parquet) files created. These are our AddFiles.
$ tree spark-warehouse/delta_demo/
spark-warehouse/delta_demo/
├── _delta_log
│   └── 00000000000000000000.json
├── part-00000-993a2fad-3643-48f5-b2be-d1b9036fb29d-c000.snappy.parquet
├── part-00003-74b48672-e869-47fc-818b-e422062c1427-c000.snappy.parquet
├── part-00006-91497579-5f25-42e6-82c9-1dc8416fe987-c000.snappy.parquet
├── part-00009-6f3e75fd-828d-4e1b-9d38-7aa65f928a9e-c000.snappy.parquet
├── part-00012-309fbcfe-4d34-45f7-b414-034f676480c6-c000.snappy.parquet
└── part-00015-5d72e873-e4df-493a-8bcf-2a3af9dfd636-c000.snappy.parquet
1 directory, 7 files
Let's load the delta table.
// FIXME I feel there should be a better way to access a DeltaLog
val tableName = "delta_demo"
val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
val tbl = spark.sessionState.catalog.getTableMetadata(tableId)
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.hadoop.fs.Path
val table: DeltaTableV2 = DeltaTableV2(
  spark, new Path(tbl.location), Some(tbl), Some(tableName))
We've finally got the DeltaTableV2 so we can proceed.
val txn = table.deltaLog.startTransaction()
// FIXME Create a fake collection of AddFiles
// We could avoid transactions and the other extra steps
// that blur what is demo'ed
import org.apache.spark.sql.delta.actions.AddFile
val fakeAddFile = AddFile(
  path = "/a/fake/file/path",
  partitionValues = Map.empty,
  size = 10,
  modificationTime = 0,
  dataChange = false)
// val addFiles: Seq[AddFile] = txn.filterFiles()
val addFiles = Seq(fakeAddFile)
val actionType = Some("createDataFrame Demo")
val df = txn.snapshot.deltaLog.createDataFrame(
  txn.snapshot,
  addFiles,
  actionTypeOpt = actionType)
Up to this point, all should work just fine (since no addfiles were checked whether they are available or not).
Let's trigger an action to see what happens when Spark SQL (with Delta Lake) decides to access the data.
df.show
The above show action will surely lead to an exception (since the fake file does not really exist).
scala> df.show
22/07/13 14:00:39 ERROR Executor: Exception in task 0.0 in stage 19.0 (TID 179)
java.io.FileNotFoundException:
File /a/fake/file/path does not exist
It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:506)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:130)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:522)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
22/07/13 14:00:39 WARN TaskSetManager: Lost task 0.0 in stage 19.0 (TID 179) (localhost executor driver): java.io.FileNotFoundException:
File /a/fake/file/path does not exist
minFileRetentionTimestamp¶
minFileRetentionTimestamp: Long
minFileRetentionTimestamp is the timestamp that is tombstoneRetentionMillis before the current time (per the given Clock).
minFileRetentionTimestamp is used when:
-  
DeltaLogis requested for the currentSnapshot, to updateInternal, and to getSnapshotAt -  
VacuumCommandis requested for garbage collecting of a delta table 
tombstoneRetentionMillis¶
tombstoneRetentionMillis: Long
tombstoneRetentionMillis gives the value of deletedFileRetentionDuration table property (from the Metadata).
tombstoneRetentionMillis is used when:
-  
DeltaLogis requested for minFileRetentionTimestamp -  
VacuumCommandis requested for garbage collecting of a delta table 
updateInternal¶
updateInternal(
  isAsync: Boolean): Snapshot
updateInternal...FIXME
updateInternal is used when:
Invalidating Cached DeltaLog Instance By Path¶
invalidateCache(
  spark: SparkSession,
  dataPath: Path): Unit
invalidateCache...FIXME
invalidateCache is a public API and does not seem to be used at all.
protocolRead¶
protocolRead(
  protocol: Protocol): Unit
protocolRead...FIXME
protocolRead is used when:
-  
OptimisticTransactionImplis requested to validate and retry a commit -  
Snapshot is created
 -  
DeltaSourceis requested to verifyStreamHygieneAndFilterAddFiles 
upgradeProtocol¶
upgradeProtocol(
  newVersion: Protocol = Protocol()): Unit
upgradeProtocol...FIXME
upgradeProtocol is used when:
DeltaTableis requested to upgradeTableProtocol
LogStoreProvider¶
DeltaLog is a LogStoreProvider.
Looking Up Cached Or Creating New DeltaLog Instance¶
apply(
  spark: SparkSession,
  rawPath: Path,
  clock: Clock = new SystemClock): DeltaLog // (1)!
apply(
  spark: SparkSession,
  rawPath: Path,
  options: Map[String, String],
  clock: Clock): DeltaLog
- Uses empty 
options 
Note
rawPath is a Hadoop Path to the _delta_log directory at the root of the data of a delta table.
apply creates a Hadoop Configuration (perhaps with fs.-prefixed options when spark.databricks.delta.loadFileSystemConfigsFromDataFrameOptions configuration property is enabled).
apply resolves the raw path to be HDFS-qualified (using the given Hadoop Path to get a Hadoop FileSystem).
In the end, apply looks up a DeltaLog for the HDFS-qualified path (with the file system options) in the deltaLogCache or creates (and caches) a new DeltaLog.
buildHadoopFsRelationWithFileIndex¶
buildHadoopFsRelationWithFileIndex(
  snapshot: SnapshotDescriptor,
  fileIndex: TahoeFileIndex,
  bucketSpec: Option[BucketSpec]): HadoopFsRelation
buildHadoopFsRelationWithFileIndex...FIXME
buildHadoopFsRelationWithFileIndex is used when:
DeltaLogis requested to create a DataFrame and create a BaseRelation
assertTableFeaturesMatchMetadata¶
assertTableFeaturesMatchMetadata(
  targetProtocol: Protocol,
  targetMetadata: Metadata): Unit
assertTableFeaturesMatchMetadata...FIXME
assertTableFeaturesMatchMetadata is used when:
Snapshotis requested to init
Assert Table is Append-Only¶
assertRemovable(): Unit
Procedure
assertRemovable is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
With delta.appendOnly table property enabled, assertRemovable throws a DeltaUnsupportedOperationException.
assertRemovable is used when:
OptimisticTransactionImplis requested to prepareCommit- Delete, Update, WriteIntoDelta (in 
Overwritesave mode) commands are executed DeltaSinkis requested to addBatch inCompleteoutput mode
Logging¶
Enable ALL logging level for org.apache.spark.sql.delta.DeltaLog logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.delta.DeltaLog=ALL
Refer to Logging.