TransactionalWrite¶
TransactionalWrite
is an abstraction of optimistic transactional writers that can write a structured query out to a Delta table.
Contract¶
DeltaLog¶
deltaLog: DeltaLog
DeltaLog (of a delta table) that this transaction is changing
Used when:
OptimisticTransactionImpl
is requested to prepare a commit, doCommit, checkAndRetry, and perform post-commit operations (and execute delta log checkpoint)- ConvertToDeltaCommand is executed
DeltaCommand
is requested to buildBaseRelation and commitLarge- MergeIntoCommand is executed
TransactionalWrite
is requested to write a structured query out to a delta table- GenerateSymlinkManifest post-commit hook is executed
ImplicitMetadataOperation
is requested to updateMetadataDeltaSink
is requested to addBatch
Metadata¶
metadata: Metadata
Metadata (of the delta table) that this transaction is changing
Protocol¶
protocol: Protocol
Protocol (of the delta table) that this transaction is changing
Used when:
OptimisticTransactionImpl
is requested to updateMetadata, verifyNewMetadata and prepareCommit- ConvertToDeltaCommand is executed
Snapshot¶
snapshot: Snapshot
Snapshot (of the delta table) that this transaction is reading at
Implementations¶
spark.databricks.delta.history.metricsEnabled¶
With spark.databricks.delta.history.metricsEnabled configuration property enabled, TransactionalWrite
creates a BasicWriteJobStatsTracker
(Spark SQL) and registers SQL metrics (when requested to write data out).
hasWritten Flag¶
hasWritten: Boolean = false
TransactionalWrite
uses hasWritten
internal registry to prevent OptimisticTransactionImpl
from updating metadata after writting data out.
hasWritten
is initially false
and changes to true
after data is written out.
Write Data Out¶
writeFiles(
data: Dataset[_]): Seq[FileAction] // (1)!
writeFiles(
data: Dataset[_],
writeOptions: Option[DeltaOptions]): Seq[FileAction] // (3)!
writeFiles(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
additionalConstraints: Seq[Constraint]): Seq[FileAction] // (4)!
writeFiles(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
isOptimize: Boolean,
additionalConstraints: Seq[Constraint]): Seq[FileAction]
writeFiles(
data: Dataset[_],
additionalConstraints: Seq[Constraint]): Seq[FileAction] // (2)!
- Uses no Constraints
- Uses no write-related DeltaOptions
- Uses no Constraints
isOptimize
disabled
writeFiles
writes the given data
(as a Dataset
) to a delta table and returns AddFiles with AddCDCFiles (from the DelayedCommitProtocol).
writeFiles
is used when:
DeleteCommand
is requested to rewriteFilesDeltaSink
is requested to add a streaming micro-batchMergeIntoCommandBase
is requested to write data outOptimizeExecutor
is requested to runOptimizeBinJobRemoveColumnMappingCommand
is requested to write data outUpdateCommand
is requested to rewriteFilesWriteIntoDelta
is requested to writeAndReturnCommitData (and writeFiles)
writeFiles
creates a DeltaInvariantCheckerExec and a DelayedCommitProtocol to write out files to the data path (of the DeltaLog).
FileFormatWriter
writeFiles
uses FileFormatWriter
(Spark SQL) utility to write out a result of a streaming query.
writeFiles
is executed within SQLExecution.withNewExecutionId
.
SQLAppStatusListener
writeFiles
can be tracked using web UI or SQLAppStatusListener
(using SparkListenerSQLExecutionStart
and SparkListenerSQLExecutionEnd
events).
Learn about SQLAppStatusListener in The Internals of Spark SQL online book.
In the end, writeFiles
returns the addedStatuses of the DelayedCommitProtocol committer.
Step 1. Mark Write Executed¶
Even though it is so early, writeFiles
turns the hasWritten flag on (true
).
Note
After writeFiles
, no metadata updates in the transaction are permitted.
writeFiles
performs CDC augmentation (for the delta table with Change Data Feed enabled).
writeFiles
normalizes the output dataset (based on the given DeltaOptions).
Step 2. Partitioning Columns¶
writeFiles
determines the partitioning columns of the data(set) to be written out.
Step 3. DelayedCommitProtocol Committer¶
writeFiles
creates a DelayedCommitProtocol committer for the data path of the delta table.
Step 4. DeltaJobStatisticsTracker¶
writeFiles
may or may not create a DeltaJobStatisticsTracker based on stats.collect configuration property.
Step 5. Constraints¶
writeFiles
collects Constraints:
- From the table metadata (CHECK constraints and Column Invariants)
- Generated Columns (from normalization)
- The given additional Constraints
Step 6. New deltaTransactionalWrite Execution ID¶
writeFiles
requests a new execution ID (Spark SQL) with deltaTransactionalWrite
name to execute a Spark write job.
FileFormatWriter
Delta Lake uses Spark SQL infrastructure to write data out.
Step 6.1 No Custom Partitioning¶
writeFiles
uses a FileFormatWriter.OutputSpec
with no custom partition locations.
Step 6.2 DeltaInvariantCheckerExec¶
writeFiles
creates a DeltaInvariantCheckerExec unary physical operator (with the executed plan of the normalized query execution as the child operator and the constraints).
Note
The DeltaInvariantCheckerExec
physical operator is later used as the physical plan to for the data to be written out.
Step 6.3 DeltaOptimizedWriterExec¶
writeFiles
creates a DeltaOptimizedWriterExec physical operator as the parent of the DeltaInvariantCheckerExec unary physical operator when all of the following hold true:
isOptimize
is disabled (false
)- shouldOptimizeWrite
Otherwise, writeFiles
leaves the DeltaInvariantCheckerExec unary physical operator intact.
Step 6.4 BasicWriteJobStatsTracker¶
writeFiles
may or may not create a BasicWriteJobStatsTracker
(Spark SQL) based on history.metricsEnabled configuration property.
With history.metricsEnabled enabled (and BasicWriteJobStatsTracker
created), writeFiles
registers the following metrics to be collected (tracked):
Metric Name | UI Description |
---|---|
numFiles | number of written files |
numOutputBytes | written output |
numOutputRows | number of output rows |
numParts | number of dynamic part |
jobCommitTime | job commit time |
Step 6.5 Write Options¶
writeFiles
makes sure (filters out) that there are only the following write options used (from the given writeOptions
), if specified:
writeFiles
adds one Uniform (Iceberg compatibility-specific) option:
Option | Value |
---|---|
writePartitionColumns | isAnyEnabled |
Step 6.6 DeltaFileFormatWriter¶
As the very last step within the scope of the new execution ID, writeFiles
writes out the data.
writeFiles
uses the following (among the others):
- DeltaInvariantCheckerExec (possibly with DeltaOptimizedWriterExec parent) as the physical plan
- The partitioning columns
- No bucketing
- DeltaJobStatisticsTracker and BasicWriteJobStatsTracker
Step 7. Collect AddFiles and AddCDCFiles¶
writeFiles
requests the DelayedCommitProtocol for the AddFiles.
With a DeltaJobStatisticsTracker, writeFiles
adds the recordedStats to every AddFile (by path).
With Iceberg Compatibility V2 enabled, writeFiles
adds ICEBERG_COMPAT_VERSION
tag with the value of 2
to every AddFile.
With at least one AddFile and the given isOptimize
flag disabled, writeFiles
registers the AutoCompact post-commit hook.
In the end, writeFiles
returns the AddFiles and AddCDCFiles (from the DelayedCommitProtocol).
getOptionalStatsTrackerAndStatsCollection¶
getOptionalStatsTrackerAndStatsCollection(
output: Seq[Attribute],
outputPath: Path,
partitionSchema: StructType, data: DataFrame): (Option[DeltaJobStatisticsTracker], Option[StatisticsCollection])
Noop with spark.databricks.delta.stats.collect disabled
getOptionalStatsTrackerAndStatsCollection
returns neither DeltaJobStatisticsTracker nor StatisticsCollection with spark.databricks.delta.stats.collect disabled.
getOptionalStatsTrackerAndStatsCollection
getStatsSchema (for the given output
and partitionSchema
).
getOptionalStatsTrackerAndStatsCollection
reads the value of delta.dataSkippingNumIndexedCols table property (from the Metadata).
getOptionalStatsTrackerAndStatsCollection
creates a StatisticsCollection (with the tableDataSchema based on spark.databricks.delta.stats.collect.using.tableSchema configuration property).
getOptionalStatsTrackerAndStatsCollection
getStatsColExpr for the statsDataSchema
and the StatisticsCollection
.
In the end, getOptionalStatsTrackerAndStatsCollection
creates a DeltaJobStatisticsTracker and the StatisticsCollection
.
getStatsColExpr¶
getStatsColExpr(
statsDataSchema: Seq[Attribute],
statsCollection: StatisticsCollection): Expression
getStatsColExpr
creates a Dataset
for a LocalRelation
(Spark SQL) logical operator with the given statsDataSchema
.
getStatsColExpr
uses Dataset.select
to execute to_json
standard function with statsCollector column.
In the end, getStatsColExpr
takes the first Expression
(from the expressions) in the analyzed
logical query plan.
Creating FileCommitProtocol Committer¶
getCommitter(
outputPath: Path): DelayedCommitProtocol
getCommitter
creates a new DelayedCommitProtocol with the delta
job ID and the given outputPath
(and no random prefix length).
Note
The DelayedCommitProtocol is used for FileFormatWriter
(Spark SQL) to write data out and, in the end, for the addedStatuses and changeFiles.
getCommitter
is used when:
TransactionalWrite
is requested to write data out
getPartitioningColumns¶
getPartitioningColumns(
partitionSchema: StructType,
output: Seq[Attribute],
colsDropped: Boolean): Seq[Attribute]
getPartitioningColumns
...FIXME
normalizeData¶
normalizeData(
deltaLog: DeltaLog,
data: Dataset[_]): (QueryExecution, Seq[Attribute], Seq[Constraint], Set[String])
normalizeData
normalizes the column names (using the table schema of the Metadata and the given data
).
normalizeData
tableHasDefaultExpr (using the Protocol and the Metadata).
normalizeData
...FIXME
normalizeData
is used when:
TransactionalWrite
is requested to write data out
makeOutputNullable¶
makeOutputNullable(
output: Seq[Attribute]): Seq[Attribute]
makeOutputNullable
...FIXME
performCDCPartition¶
performCDCPartition(
inputData: Dataset[_]): (DataFrame, StructType)
performCDCPartition
returns the input inputData
with or without __is_cdc extra partition column based on whether Change Data Feed is enabled for the table and _change_type column is available in the schema of the given inputData
or not.
The value of the __is_cdc extra column is as follows:
true
for non-null_change_type
sfalse
otherwise
The schema (the StructType
of the tuple to be returned) includes the __is_cdc extra column as the first column (followed by the physicalPartitionSchema).