DeltaLog¶
DeltaLog
is a transaction log (change log) of changes to the state of a delta table.
DeltaLog
uses _delta_log directory for (the files of) the transaction log of a delta table (that is given when DeltaLog.forTable utility is used to create an instance).
DeltaLog
is created (indirectly via DeltaLog.apply utility) when DeltaLog.forTable utility is used.
import org.apache.spark.sql.SparkSession
assert(spark.isInstanceOf[SparkSession])
val dataPath = "/tmp/delta/t1"
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, dataPath)
import org.apache.hadoop.fs.Path
val expected = new Path(s"file:$dataPath/_delta_log/_last_checkpoint")
assert(deltaLog.LAST_CHECKPOINT == expected)
A common idiom (if not the only way) to know the current version of the delta table is to request the DeltaLog
for the current state (snapshot) and then for the version.
import org.apache.spark.sql.delta.DeltaLog
assert(deltaLog.isInstanceOf[DeltaLog])
val deltaVersion = deltaLog.snapshot.version
scala> println(deltaVersion)
5
When created, DeltaLog
does the following:
-
Creates the <
> based on < > configuration property (default: < >) -
Initializes the <
> -
<
> when there is no < > (e.g. the version of the < > is -1
)
In other words, the version of (the DeltaLog
of) a delta table is at version 0
at the very minimum.
assert(deltaLog.snapshot.version >= 0)
DeltaLog
is a LogStoreProvider.
filterFileList Utility¶
filterFileList(
partitionSchema: StructType,
files: DataFrame,
partitionFilters: Seq[Expression],
partitionColumnPrefixes: Seq[String] = Nil): DataFrame
filterFileList
...FIXME
filterFileList
is used when:
OptimisticTransactionImpl
is requested to checkAndRetryPartitionFiltering
is requested to filesForScanWriteIntoDelta
is requested to writeSnapshotIterator
is requested to iteratorTahoeBatchFileIndex
is requested to matchingFilesDeltaDataSource
utility is requested to verifyAndCreatePartitionFilters
Creating DeltaLog Instance¶
forTable(
spark: SparkSession,
dataPath: File): DeltaLog
forTable(
spark: SparkSession,
dataPath: File,
clock: Clock): DeltaLog
forTable(
spark: SparkSession,
dataPath: Path): DeltaLog
forTable(
spark: SparkSession,
dataPath: Path,
clock: Clock): DeltaLog
forTable(
spark: SparkSession,
dataPath: String): DeltaLog
forTable(
spark: SparkSession,
dataPath: String,
clock: Clock): DeltaLog
forTable
creates a DeltaLog with _delta_log directory (in the given dataPath
directory).
forTable
is used when:
-
<
> utility is used to create a < > -
<
>, < >, < > are requested to run
-
DeltaDataSource
is requested to <>, < >, and create a relation (as < > and < >) -
<
> utility is used -
DeltaTableIdentifier
is requested togetDeltaLog
-
<
> is created
== [[FileFormats]] FileFormats
DeltaLog
defines two FileFormats
:
-
[[CHECKPOINT_FILE_FORMAT]]
ParquetFileFormat
for indices of delta files -
[[COMMIT_FILE_FORMAT]]
JsonFileFormat
for indices of checkpoint files
The FileFormats
are used to create <
== [[_delta_log]] _delta_log Directory
DeltaLog
uses _delta_log metadata directory under the <
The _delta_log
directory is resolved (in the <
[NOTE]¶
<SparkSession
to create an Hadoop Configuration
instance.
[source, scala]¶
spark.sessionState.newHadoopConf()¶
====
Once resolved and turned into a qualified path, the _delta_log
directory of the delta table (under the <
== [[store]] LogStore
DeltaLog
uses an <
== [[deltaLogCache]] Transaction Logs (DeltaLogs) per Fully-Qualified Path -- deltaLogCache
Internal Registry
[source, scala]¶
deltaLogCache: Cache[Path, DeltaLog]¶
NOTE: deltaLogCache
is part of DeltaLog
Scala object which makes it an application-wide cache. Once used, deltaLogCache
will only be one until the application that uses it stops.
deltaLogCache
is a registry of DeltaLogs
by their fully-qualified <<_delta_log, _delta_log>> directories. A new instance of DeltaLog
is added when <
deltaLogCache
is invalidated:
-
For a delta table using <
> utility (and < > when the cached reference is no longer < >) -
For all delta tables using <
> utility
== [[creating-instance]] Creating DeltaLog Instance
DeltaLog
takes the following to be created:
DeltaLog
initializes the <
== [[apply]] Looking Up Or Creating DeltaLog Instance -- apply
Utility
[source, scala]¶
apply( spark: SparkSession, rawPath: Path, clock: Clock = new SystemClock): DeltaLog
NOTE: rawPath
is a Hadoop Path to the <<_delta_log, _delta_log>> directory at the root of the data of a delta table.
apply
...FIXME
NOTE: apply
is used when DeltaLog
is requested to <
Executing Single-Threaded Operation in New Transaction¶
withNewTransaction[T](
thunk: OptimisticTransaction => T): T
withNewTransaction
starts a new transaction (that is active for the whole thread) and executes the given thunk
block.
In the end, withNewTransaction
makes the transaction no longer active.
withNewTransaction
is used when:
-
DeleteCommand, MergeIntoCommand, UpdateCommand, and WriteIntoDelta commands are executed
-
DeltaSink
is requested to add a streaming micro-batch
Starting New Transaction¶
startTransaction(): OptimisticTransaction
startTransaction <
NOTE: startTransaction is a subset of <
startTransaction is used when:
-
DeltaLog is requested to <
> -
AlterDeltaTableCommand is requested to AlterDeltaTableCommand.md#startTransaction[startTransaction]
-
ConvertToDeltaCommandBase is ConvertToDeltaCommand.md#run[executed]
-
CreateDeltaTableCommand is CreateDeltaTableCommand.md#run[executed]
== [[assertRemovable]] Throwing UnsupportedOperationException For appendOnly Table Property Enabled -- assertRemovable
Method
[source, scala]¶
assertRemovable(): Unit¶
assertRemovable
throws an UnsupportedOperationException
for the <true
):
This table is configured to only allow appends. If you would like to permit updates or deletes, use 'ALTER TABLE <table_name> SET TBLPROPERTIES (appendOnly=false)'.
NOTE: assertRemovable
is used when...FIXME
== [[metadata]] metadata
Method
[source, scala]¶
metadata: Metadata¶
NOTE: metadata
is part of the <
metadata
requests the <
== [[update]] update
Method
[source, scala]¶
update( stalenessAcceptable: Boolean = false): Snapshot
update
branches off based on a combination of flags: the given stalenessAcceptable
and <
For the stalenessAcceptable
not acceptable (default) and the <update
simply acquires the <isAsync
flag off).
For all other cases, update
...FIXME
[NOTE]¶
update
is used when:
-
DeltaHistoryManager
is requested to <>, < >, and < > -
DeltaLog
is <> (with no < > created), and requested to < > and < > -
OptimisticTransactionImpl
is requested to <> and < > -
ConvertToDeltaCommand
is requested to <> and < > -
VacuumCommand
utility is used to <> -
TahoeLogFileIndex
is requested for the <>
* DeltaDataSource
is requested for a <>¶
== [[snapshot]] Current State Snapshot -- snapshot
Method
[source, scala]¶
snapshot: Snapshot¶
snapshot
returns the <
[NOTE]¶
snapshot
is used when:
-
<
> is created -
Checkpoints
is requested to <> -
DeltaLog
is requested for the <>, to < >, < >, < > -
OptimisticTransactionImpl
is requested to <> -
<
>, < >, < >, < >, < > are executed -
DeltaCommand is requested to <
> -
TahoeFileIndex
is requested for the <>, < > -
TahoeLogFileIndex
is requested for the <> -
DeltaDataSource
is requested for the <>
* <> is created and requested for the <>, <>¶
== [[currentSnapshot]] Current State Snapshot -- currentSnapshot
Internal Registry
[source, scala]¶
currentSnapshot: Snapshot¶
currentSnapshot
is a <Snapshot
instance (with version being -1
).
NOTE: For a new Snapshot
instance (with version being -1
) DeltaLog
immediately <
Internally, currentSnapshot
...FIXME
NOTE: currentSnapshot
is available using <
NOTE: currentSnapshot
is used when DeltaLog
is requested to <
Creating Insertable HadoopFsRelation For Batch Queries¶
createRelation(
partitionFilters: Seq[Expression] = Nil,
timeTravel: Option[DeltaTimeTravelSpec] = None): BaseRelation
createRelation
...FIXME
createRelation
creates a TahoeLogFileIndex for the data path, the given partitionFilters
and a version (if defined).
createRelation
...FIXME
In the end, createRelation
creates a HadoopFsRelation
for the TahoeLogFileIndex
and...FIXME. The HadoopFsRelation
is also an <
createRelation
is used when DeltaDataSource
is requested for a relation as a CreatableRelationProvider and a RelationProvider (for batch queries).
insert Method¶
insert(
data: DataFrame,
overwrite: Boolean): Unit
insert
...FIXME
insert
is part of the InsertableRelation
abstraction.
== [[getSnapshotAt]] Retrieving State Of Delta Table At Given Version -- getSnapshotAt
Method
[source, scala]¶
getSnapshotAt( version: Long, commitTimestamp: Option[Long] = None, lastCheckpointHint: Option[CheckpointInstance] = None): Snapshot
getSnapshotAt
...FIXME
[NOTE]¶
getSnapshotAt
is used when:
-
DeltaLog
is requested for a <>, and to < > -
DeltaSource
is requested to <>
* TahoeLogFileIndex
is requested for <>¶
== [[tryUpdate]] tryUpdate
Method
[source, scala]¶
tryUpdate( isAsync: Boolean = false): Snapshot
tryUpdate
...FIXME
NOTE: tryUpdate
is used exclusively when DeltaLog
is requested to <
== [[ensureLogDirectoryExist]] ensureLogDirectoryExist
Method
[source, scala]¶
ensureLogDirectoryExist(): Unit¶
ensureLogDirectoryExist
...FIXME
NOTE: ensureLogDirectoryExist
is used when...FIXME
== [[protocolWrite]] protocolWrite
Method
[source, scala]¶
protocolWrite( protocol: Protocol, logUpgradeMessage: Boolean = true): Unit
protocolWrite
...FIXME
NOTE: protocolWrite
is used when...FIXME
== [[checkpointInterval]] checkpointInterval
Method
[source, scala]¶
checkpointInterval: Int¶
checkpointInterval
gives the value of <
NOTE: checkpointInterval
is used when...FIXME
== [[getChanges]] Changes (Actions) Of Delta Version And Later -- getChanges
Method
[source, scala]¶
getChanges( startVersion: Long): Iterator[(Long, Seq[Action])]
getChanges
gives all <startVersion
of a delta table and later.
[source,scala]¶
val dataPath = "/tmp/delta/users" import org.apache.spark.sql.delta.DeltaLog val deltaLog = DeltaLog.forTable(spark, dataPath) assert(deltaLog.isInstanceOf[DeltaLog]) val changesPerVersion = deltaLog.getChanges(startVersion = 0)
Internally, getChanges
requests the <startVersion
(in the <.json
file extension).
For every delta file, getChanges
requests the <
NOTE: getChanges
is used when DeltaSource
is requested for the <
Creating DataFrame For Given AddFiles¶
createDataFrame(
snapshot: Snapshot,
addFiles: Seq[AddFile],
isStreaming: Boolean = false,
actionTypeOpt: Option[String] = None): DataFrame
createDataFrame
uses the action type based on the optional action type (if defined) or uses the following based on the isStreaming
flag:
- streaming when
isStreaming
flag is enabled (true
) - batch when
isStreaming
flag is disabled (false
)
Note
actionTypeOpt
seems not to be defined ever.
createDataFrame
creates a new TahoeBatchFileIndex (for the action type, and the given AddFiles and Snapshot).
createDataFrame
creates a HadoopFsRelation
with the TahoeBatchFileIndex
and the other properties based on the given Snapshot
(and the associated Metadata).
Tip
Learn more on HadoopFsRelation in The Internals of Spark SQL online book.
In the end, createDataFrame
creates a DataFrame
with a logical query plan with a LogicalRelation
over the HadoopFsRelation
.
Tip
Learn more on LogicalRelation in The Internals of Spark SQL online book.
createDataFrame
is used when:
- MergeIntoCommand is executed
DeltaSource
is requested for a DataFrame for data between start and end offsets
== [[lockInterruptibly]] Acquiring Interruptible Lock on Log -- lockInterruptibly
Method
[source, scala]¶
lockInterruptiblyT: T¶
lockInterruptibly
...FIXME
NOTE: lockInterruptibly
is used when...FIXME
== [[minFileRetentionTimestamp]] minFileRetentionTimestamp
Method
[source, scala]¶
minFileRetentionTimestamp: Long¶
minFileRetentionTimestamp
is the timestamp that is <
[NOTE]¶
minFileRetentionTimestamp
is used when:
DeltaLog
is requested for the <>, to < >, and to < >
* VacuumCommand
is requested for <>¶
== [[tombstoneRetentionMillis]] tombstoneRetentionMillis
Method
[source, scala]¶
tombstoneRetentionMillis: Long¶
tombstoneRetentionMillis
gives the value of <
[NOTE]¶
tombstoneRetentionMillis
is used when:
DeltaLog
is requested for <>
* VacuumCommand
is requested for <>¶
== [[updateInternal]] updateInternal
Internal Method
[source, scala]¶
updateInternal( isAsync: Boolean): Snapshot
updateInternal
...FIXME
NOTE: updateInternal
is used when DeltaLog
is requested to <
== [[invalidateCache]] Invalidating Cached DeltaLog Instance By Path -- invalidateCache
Utility
[source, scala]¶
invalidateCache( spark: SparkSession, dataPath: Path): Unit
invalidateCache
...FIXME
NOTE: invalidateCache
is a public API and does not seem to be used at all.
== [[clearCache]] Removing (Clearing) All Cached DeltaLog Instances -- clearCache
Utility
[source, scala]¶
clearCache(): Unit¶
clearCache
...FIXME
NOTE: clearCache
is a public API and is used exclusively in tests.
== [[upgradeProtocol]] upgradeProtocol
Method
[source, scala]¶
upgradeProtocol( newVersion: Protocol = Protocol()): Unit
upgradeProtocol
...FIXME
NOTE: upgradeProtocol
seems to be used exclusively in tests.
== [[protocolRead]] protocolRead
Method
[source, scala]¶
protocolRead( protocol: Protocol): Unit
protocolRead
...FIXME
[NOTE]¶
protocolRead
is used when:
-
OptimisticTransactionImpl
is requested to <> -
<
> is created
* DeltaSource
is requested to <>¶
== [[isValid]] isValid
Method
[source, scala]¶
isValid(): Boolean¶
isValid
...FIXME
NOTE: isValid
is used when DeltaLog
utility is used to <
Logging¶
Enable ALL
logging level for org.apache.spark.sql.delta.DeltaLog
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.delta.DeltaLog=ALL
Refer to Logging.