DeltaLog¶
DeltaLog
is a transaction log (change log) of changes to the state of a Delta table (in the given data directory).
Creating Instance¶
DeltaLog
takes the following to be created:
DeltaLog
is created (indirectly via DeltaLog.apply utility) when:
- DeltaLog.forTable utility is used
_delta_log Metadata Directory¶
DeltaLog
uses _delta_log metadata directory for the transaction log of a Delta table.
The _delta_log
directory is in the given data path directory (when created using DeltaLog.forTable utility).
The _delta_log
directory is resolved (in the DeltaLog.apply utility) using the application-wide Hadoop Configuration.
Once resolved and turned into a qualified path, the _delta_log
directory is cached.
DeltaLog.forTable Utility¶
forTable(
spark: SparkSession,
table: CatalogTable): DeltaLog
forTable(
spark: SparkSession,
table: CatalogTable,
clock: Clock): DeltaLog
forTable(
spark: SparkSession,
deltaTable: DeltaTableIdentifier): DeltaLog
forTable(
spark: SparkSession,
dataPath: File): DeltaLog
forTable(
spark: SparkSession,
dataPath: File,
clock: Clock): DeltaLog
forTable(
spark: SparkSession,
dataPath: Path): DeltaLog
forTable(
spark: SparkSession,
dataPath: Path,
clock: Clock): DeltaLog
forTable(
spark: SparkSession,
dataPath: String): DeltaLog
forTable(
spark: SparkSession,
dataPath: String,
clock: Clock): DeltaLog
forTable(
spark: SparkSession,
tableName: TableIdentifier): DeltaLog
forTable(
spark: SparkSession,
tableName: TableIdentifier,
clock: Clock): DeltaLog
forTable
creates a DeltaLog with _delta_log directory (in the given dataPath
directory).
forTable
is used when:
-
AlterTableSetLocationDeltaCommand, ConvertToDeltaCommand, VacuumTableCommand, CreateDeltaTableCommand, DeltaGenerateCommand, DescribeDeltaDetailCommand, DescribeDeltaHistoryCommand commands are executed
-
DeltaDataSource
is requested for the source schema, a source, and a relation -
DeltaTable.isDeltaTable utility is used
-
DeltaTableUtils.combineWithCatalogMetadata utility is used
-
DeltaTableIdentifier
is requested to getDeltaLog -
DeltaCatalog
is requested to createDeltaTable -
DeltaTableV2
is requested for the DeltaLog -
DeltaSink is created
Looking Up Or Creating DeltaLog Instance¶
apply(
spark: SparkSession,
rawPath: Path,
clock: Clock = new SystemClock): DeltaLog
Note
rawPath
is a Hadoop Path to the _delta_log directory at the root of the data of a delta table.
apply
...FIXME
tableExists¶
tableExists: Boolean
tableExists
requests the current Snapshot for the version and checks out whether it is 0
or higher.
is used when:
DeltaTable
utility is used to isDeltaTable- DeltaUnsupportedOperationsCheck logical check rule is executed
DeltaTableV2
is requested to toBaseRelation
Demo: Creating DeltaLog¶
import org.apache.spark.sql.SparkSession
assert(spark.isInstanceOf[SparkSession])
val dataPath = "/tmp/delta/t1"
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, dataPath)
import org.apache.hadoop.fs.Path
val expected = new Path(s"file:$dataPath/_delta_log/_last_checkpoint")
assert(deltaLog.LAST_CHECKPOINT == expected)
Accessing Current Version¶
A common idiom (if not the only way) to know the current version of the delta table is to request the DeltaLog
for the current state (snapshot) and then for the version.
import org.apache.spark.sql.delta.DeltaLog
assert(deltaLog.isInstanceOf[DeltaLog])
val deltaVersion = deltaLog.snapshot.version
scala> println(deltaVersion)
5
Initialization¶
When created, DeltaLog
does the following:
-
Creates the LogStore based on spark.delta.logStore.class configuration property
-
Initializes the current snapshot
-
Updates state of the delta table when there is no metadata checkpoint (e.g. the version of the state is
-1
)
In other words, the version of (the DeltaLog
of) a delta table is at version 0
at the very minimum.
assert(deltaLog.snapshot.version >= 0)
filterFileList Utility¶
filterFileList(
partitionSchema: StructType,
files: DataFrame,
partitionFilters: Seq[Expression],
partitionColumnPrefixes: Seq[String] = Nil): DataFrame
filterFileList
...FIXME
filterFileList
is used when:
OptimisticTransactionImpl
is requested to checkAndRetryPartitionFiltering
is requested to filesForScanWriteIntoDelta
is requested to writeSnapshotIterator
is requested to iteratorTahoeBatchFileIndex
is requested to matchingFilesDeltaDataSource
utility is requested to verifyAndCreatePartitionFilters
FileFormats¶
DeltaLog
defines two FileFormat
s (Spark SQL):
-
ParquetFileFormat
for indices of delta files -
JsonFileFormat
for indices of checkpoint files
These FileFormat
s are used to create DeltaLogFileIndexes for Snapshots that in turn used them for stateReconstruction.
LogStore¶
DeltaLog
uses a LogStore for...FIXME
Transaction Logs (DeltaLogs) per Fully-Qualified Path¶
deltaLogCache: Cache[Path, DeltaLog]
deltaLogCache
is part of DeltaLog
Scala object which makes it an application-wide cache "for free". Once used, deltaLogCache
will only be one until the application that uses it stops.
deltaLogCache
is a registry of DeltaLogs
by their fully-qualified _delta_log directories. A new instance of DeltaLog
is added when DeltaLog.apply utility is used and the instance hasn't been created before for a path.
deltaLogCache
is invalidated:
-
For a delta table using DeltaLog.invalidateCache utility
-
For all delta tables using DeltaLog.clearCache utility
Executing Single-Threaded Operation in New Transaction¶
withNewTransaction[T](
thunk: OptimisticTransaction => T): T
withNewTransaction
starts a new transaction (that is active for the whole thread) and executes the given thunk
block.
In the end, withNewTransaction
makes the transaction no longer active.
withNewTransaction
is used when:
-
DeleteCommand, MergeIntoCommand, UpdateCommand, and WriteIntoDelta commands are executed
-
DeltaSink
is requested to add a streaming micro-batch
Starting New Transaction¶
startTransaction(): OptimisticTransaction
startTransaction
updates and creates a new OptimisticTransaction (for this DeltaLog
).
Note
startTransaction
is a "subset" of withNewTransaction.
startTransaction
is used when:
-
DeltaLog
is requested to upgradeProtocol -
AlterDeltaTableCommand
is requested to startTransaction -
ConvertToDeltaCommand and CreateDeltaTableCommand are executed
Throwing UnsupportedOperationException for Append-Only Tables¶
assertRemovable(): Unit
assertRemovable
throws an UnsupportedOperationException
for the appendOnly table property (in the Metadata) enabled (true
):
This table is configured to only allow appends. If you would like to permit updates or deletes, use 'ALTER TABLE <table_name> SET TBLPROPERTIES (appendOnly=false)'.
assertRemovable
is used when:
OptimisticTransactionImpl
is requested to prepareCommit- DeleteCommand, UpdateCommand, WriteIntoDelta (with
Overwrite
mode) are executed DeltaSink
is requested to addBatch (withComplete
output mode)
metadata¶
metadata: Metadata
metadata
is part of the Checkpoints abstraction.
metadata
requests the current Snapshot for the metadata or creates a new one (if the current Snapshot is not initialized).
update¶
update(
stalenessAcceptable: Boolean = false): Snapshot
update
branches off based on a combination of flags: the given stalenessAcceptable
and isSnapshotStale flags.
For the stalenessAcceptable
not acceptable (default) and the snapshot not stale, update
simply acquires the deltaLogLock lock and updateInternal (with isAsync
flag off).
For all other cases, update
...FIXME
update
is used when:
-
DeltaHistoryManager
is requested to getHistory, getActiveCommitAtTime, and checkVersionExists -
DeltaLog
is created (with no checkpoint created), and requested to startTransaction and withNewTransaction -
OptimisticTransactionImpl
is requested to doCommit and checkAndRetry -
ConvertToDeltaCommand
is requested to run and streamWrite -
VacuumCommand
utility is used to gc -
TahoeLogFileIndex
is requested for the (historical or latest) snapshot -
DeltaDataSource
is requested for a relation
tryUpdate¶
tryUpdate(
isAsync: Boolean = false): Snapshot
tryUpdate
...FIXME
Current State Snapshot¶
snapshot: Snapshot
snapshot
returns the current snapshot.
snapshot
is used when:
-
OptimisticTransaction is created
-
Checkpoints
is requested to checkpoint -
DeltaLog
is requested for the metadata, to upgradeProtocol, getSnapshotAt, createRelation -
OptimisticTransactionImpl
is requested to getNextAttemptVersion -
DeleteCommand, DeltaGenerateCommand, DescribeDeltaDetailCommand, UpdateCommand commands are executed
-
GenerateSymlinkManifest is executed
-
DeltaCommand
is requested to buildBaseRelation -
TahoeFileIndex
is requested for the table version, partitionSchema -
TahoeLogFileIndex
is requested for the table size -
DeltaDataSource
is requested for the schema of the streaming delta source -
DeltaSource is created and requested for the getStartingOffset, getBatch
Current State Snapshot¶
currentSnapshot: Snapshot
currentSnapshot
is a Snapshot based on the metadata checkpoint if available or a new Snapshot
instance (with version being -1
).
Note
For a new Snapshot
instance (with version being -1
) DeltaLog
immediately updates the state.
Internally, currentSnapshot
...FIXME
currentSnapshot
is available using snapshot method.
currentSnapshot
is used when:
DeltaLog
is requested to updateInternal, update and tryUpdate
Creating Insertable HadoopFsRelation For Batch Queries¶
createRelation(
partitionFilters: Seq[Expression] = Nil,
snapshotToUseOpt: Option[Snapshot] = None,
isTimeTravelQuery: Boolean = false,
cdcOptions: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty): BaseRelation
createRelation
...FIXME
createRelation
creates a TahoeLogFileIndex for the data path, the given partitionFilters
and a version (if defined).
createRelation
...FIXME
In the end, createRelation
creates a HadoopFsRelation
for the TahoeLogFileIndex
and...FIXME. The HadoopFsRelation
is also an InsertableRelation.
createRelation
is used when:
DeltaTableV2
is requested to toBaseRelationWriteIntoDeltaBuilder
is requested to buildForV1WriteDeltaDataSource
is requested for a writable relation
insert¶
insert(
data: DataFrame,
overwrite: Boolean): Unit
insert
...FIXME
insert
is part of the InsertableRelation
(Spark SQL) abstraction.
Retrieving State Of Delta Table At Given Version¶
getSnapshotAt(
version: Long,
commitTimestamp: Option[Long] = None,
lastCheckpointHint: Option[CheckpointInstance] = None): Snapshot
getSnapshotAt
...FIXME
getSnapshotAt
is used when:
-
DeltaLog
is requested for a relation, and to updateInternal -
DeltaSource
is requested for the snapshot of a delta table at a given version -
TahoeLogFileIndex
is requested for historicalSnapshotOpt
checkpointInterval¶
checkpointInterval: Int
checkpointInterval
gives the value of checkpointInterval table property (from the Metadata).
checkpointInterval
is used when...FIXME
Changes (Actions) Of Delta Version And Later¶
getChanges(
startVersion: Long): Iterator[(Long, Seq[Action])]
getChanges
gives all actions (changes) per delta log file for the given startVersion
of a delta table and later.
val dataPath = "/tmp/delta/users"
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, dataPath)
assert(deltaLog.isInstanceOf[DeltaLog])
val changesPerVersion = deltaLog.getChanges(startVersion = 0)
Internally, getChanges
requests the LogStore for files that are lexicographically greater or equal to the delta log file for the given startVersion
(in the logPath) and leaves only delta log files (e.g. files with numbers only as file name and .json
file extension).
For every delta file, getChanges
requests the LogStore to read the JSON content (every line is an action), and then deserializes it to an action.
getChanges
is used when:
DeltaSource
is requested for the indexed file additions (FileAdd actions)
Creating DataFrame For Given AddFiles¶
createDataFrame(
snapshot: Snapshot,
addFiles: Seq[AddFile],
isStreaming: Boolean = false,
actionTypeOpt: Option[String] = None): DataFrame
createDataFrame
uses the action type based on the optional action type (if defined) or uses the following based on the isStreaming
flag:
- streaming when
isStreaming
flag is enabled (true
) - batch when
isStreaming
flag is disabled (false
)
Note
actionTypeOpt
seems not to be defined ever.
createDataFrame
creates a new TahoeBatchFileIndex (for the action type, and the given AddFiles and Snapshot).
createDataFrame
creates a HadoopFsRelation
(Spark SQL) with the TahoeBatchFileIndex
and the other properties based on the given Snapshot
(and the associated Metadata).
In the end, createDataFrame
creates a DataFrame
with a logical query plan with a LogicalRelation
(Spark SQL) over the HadoopFsRelation
.
createDataFrame
is used when:
- MergeIntoCommand is executed
DeltaSource
is requested for a DataFrame for data between start and end offsets
minFileRetentionTimestamp
Method¶
minFileRetentionTimestamp: Long
minFileRetentionTimestamp
is the timestamp that is tombstoneRetentionMillis before the current time (per the given Clock).
minFileRetentionTimestamp
is used when:
-
DeltaLog
is requested for the currentSnapshot, to updateInternal, and to getSnapshotAt -
VacuumCommand
is requested for garbage collecting of a delta table
tombstoneRetentionMillis
Method¶
tombstoneRetentionMillis: Long
tombstoneRetentionMillis
gives the value of deletedFileRetentionDuration table property (from the Metadata).
tombstoneRetentionMillis
is used when:
-
DeltaLog
is requested for minFileRetentionTimestamp -
VacuumCommand
is requested for garbage collecting of a delta table
updateInternal
Internal Method¶
updateInternal(
isAsync: Boolean): Snapshot
updateInternal
...FIXME
updateInternal
is used when:
Invalidating Cached DeltaLog Instance By Path¶
invalidateCache(
spark: SparkSession,
dataPath: Path): Unit
invalidateCache
...FIXME
invalidateCache
is a public API and does not seem to be used at all.
protocolRead¶
protocolRead(
protocol: Protocol): Unit
protocolRead
...FIXME
protocolRead
is used when:
-
OptimisticTransactionImpl
is requested to validate and retry a commit -
Snapshot is created
-
DeltaSource
is requested to verifyStreamHygieneAndFilterAddFiles
upgradeProtocol¶
upgradeProtocol(
newVersion: Protocol = Protocol()): Unit
upgradeProtocol
...FIXME
upgradeProtocol
is used when:
DeltaTable
is requested to upgradeTableProtocol
LogStoreProvider¶
DeltaLog
is a LogStoreProvider.
Logging¶
Enable ALL
logging level for org.apache.spark.sql.delta.DeltaLog
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.delta.DeltaLog=ALL
Refer to Logging.