TransactionalWrite¶
TransactionalWrite is an abstraction of optimistic transactional writers that can write a structured query out to a Delta table.
Contract¶
DeltaLog¶
deltaLog: DeltaLog
DeltaLog (of a delta table) that this transaction is changing
Used when:
- OptimisticTransactionImplis requested to prepare a commit, doCommit, checkAndRetry, and perform post-commit operations (and execute delta log checkpoint)
- ConvertToDeltaCommand is executed
- DeltaCommandis requested to buildBaseRelation and commitLarge
- MergeIntoCommand is executed
- TransactionalWriteis requested to write a structured query out to a delta table
- GenerateSymlinkManifest post-commit hook is executed
- ImplicitMetadataOperationis requested to updateMetadata
- DeltaSinkis requested to addBatch
Metadata¶
metadata: Metadata
Metadata (of the delta table) that this transaction is changing
Protocol¶
protocol: Protocol
Protocol (of the delta table) that this transaction is changing
Used when:
- OptimisticTransactionImplis requested to updateMetadata, verifyNewMetadata and prepareCommit
- ConvertToDeltaCommand is executed
Snapshot¶
snapshot: Snapshot
Snapshot (of the delta table) that this transaction is reading at
Implementations¶
spark.databricks.delta.history.metricsEnabled¶
With spark.databricks.delta.history.metricsEnabled configuration property enabled, TransactionalWrite creates a BasicWriteJobStatsTracker (Spark SQL) and registers SQL metrics (when requested to write data out).
hasWritten Flag¶
hasWritten: Boolean = false
TransactionalWrite uses hasWritten internal registry to prevent OptimisticTransactionImpl from updating metadata after writting data out.
hasWritten is initially false and changes to true after data is written out.
Write Data Out¶
writeFiles(
  data: Dataset[_]): Seq[FileAction]  // (1)!
writeFiles(
  data: Dataset[_],
  writeOptions: Option[DeltaOptions]): Seq[FileAction]  // (3)!
writeFiles(
  inputData: Dataset[_],
  writeOptions: Option[DeltaOptions],
  additionalConstraints: Seq[Constraint]): Seq[FileAction] // (4)!
writeFiles(
  inputData: Dataset[_],
  writeOptions: Option[DeltaOptions],
  isOptimize: Boolean,
  additionalConstraints: Seq[Constraint]): Seq[FileAction]
writeFiles(
  data: Dataset[_],
  additionalConstraints: Seq[Constraint]): Seq[FileAction]  // (2)!
- Uses no Constraints
- Uses no write-related DeltaOptions
- Uses no Constraints
- isOptimizedisabled
writeFiles writes the given data (as a Dataset) to a delta table and returns AddFiles with AddCDCFiles (from the DelayedCommitProtocol).
writeFiles is used when:
- DeleteCommandis requested to rewriteFiles
- DeltaSinkis requested to add a streaming micro-batch
- MergeIntoCommandBaseis requested to write data out
- OptimizeExecutoris requested to runOptimizeBinJob
- RemoveColumnMappingCommandis requested to write data out
- UpdateCommandis requested to rewriteFiles
- WriteIntoDeltais requested to writeAndReturnCommitData (and writeFiles)
writeFiles creates a DeltaInvariantCheckerExec and a DelayedCommitProtocol to write out files to the data path (of the DeltaLog).
FileFormatWriter
writeFiles uses FileFormatWriter (Spark SQL) utility to write out a result of a streaming query.
writeFiles is executed within SQLExecution.withNewExecutionId.
SQLAppStatusListener
writeFiles can be tracked using web UI or SQLAppStatusListener (using SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events).
Learn about SQLAppStatusListener in The Internals of Spark SQL online book.
In the end, writeFiles returns the addedStatuses of the DelayedCommitProtocol committer.
Step 1. Mark Write Executed¶
Even though it is so early, writeFiles turns the hasWritten flag on (true).
Note
After writeFiles, no metadata updates in the transaction are permitted.
writeFiles performs CDC augmentation (for the delta table with Change Data Feed enabled).
writeFiles normalizes the output dataset (based on the given DeltaOptions).
Step 2. Partitioning Columns¶
writeFiles determines the partitioning columns of the data(set) to be written out.
Step 3. DelayedCommitProtocol Committer¶
writeFiles creates a DelayedCommitProtocol committer for the data path of the delta table.
Step 4. DeltaJobStatisticsTracker¶
writeFiles may or may not create a DeltaJobStatisticsTracker based on stats.collect configuration property.
Step 5. Constraints¶
writeFiles collects Constraints:
- From the table metadata (CHECK constraints and Column Invariants)
- Generated Columns (from normalization)
- The given additional Constraints
Step 6. New deltaTransactionalWrite Execution ID¶
writeFiles requests a new execution ID (Spark SQL) with deltaTransactionalWrite name to execute a Spark write job.
FileFormatWriter
Delta Lake uses Spark SQL infrastructure to write data out.
Step 6.1 No Custom Partitioning¶
writeFiles uses a FileFormatWriter.OutputSpec with no custom partition locations.
Step 6.2 DeltaInvariantCheckerExec¶
writeFiles creates a DeltaInvariantCheckerExec unary physical operator (with the executed plan of the normalized query execution as the child operator and the constraints).
Note
The DeltaInvariantCheckerExec physical operator is later used as the physical plan to for the data to be written out.
Step 6.3 DeltaOptimizedWriterExec¶
writeFiles creates a DeltaOptimizedWriterExec physical operator as the parent of the DeltaInvariantCheckerExec unary physical operator when all of the following hold true:
- isOptimizeis disabled (- false)
- shouldOptimizeWrite
Otherwise, writeFiles leaves the DeltaInvariantCheckerExec unary physical operator intact.
Step 6.4 BasicWriteJobStatsTracker¶
writeFiles may or may not create a BasicWriteJobStatsTracker (Spark SQL) based on history.metricsEnabled configuration property.
With history.metricsEnabled enabled (and BasicWriteJobStatsTracker created), writeFiles registers the following metrics to be collected (tracked):
| Metric Name | UI Description | 
|---|---|
| numFiles | number of written files | 
| numOutputBytes | written output | 
| numOutputRows | number of output rows | 
| numParts | number of dynamic part | 
| jobCommitTime | job commit time | 
Step 6.5 Write Options¶
writeFiles makes sure (filters out) that there are only the following write options used (from the given writeOptions), if specified:
writeFiles adds one Uniform (Iceberg compatibility-specific) option:
| Option | Value | 
|---|---|
| writePartitionColumns | isAnyEnabled | 
Step 6.6 DeltaFileFormatWriter¶
As the very last step within the scope of the new execution ID, writeFiles writes out the data.
writeFiles uses the following (among the others):
- DeltaInvariantCheckerExec (possibly with DeltaOptimizedWriterExec parent) as the physical plan
- The partitioning columns
- No bucketing
- DeltaJobStatisticsTracker and BasicWriteJobStatsTracker
Step 7. Collect AddFiles and AddCDCFiles¶
writeFiles requests the DelayedCommitProtocol for the AddFiles.
With a DeltaJobStatisticsTracker, writeFiles adds the recordedStats to every AddFile (by path).
With Iceberg Compatibility V2 enabled, writeFiles adds ICEBERG_COMPAT_VERSION tag with the value of 2 to every AddFile.
With at least one AddFile and the given isOptimize flag disabled, writeFiles registers the AutoCompact post-commit hook.
In the end, writeFiles returns the AddFiles and AddCDCFiles (from the DelayedCommitProtocol).
getOptionalStatsTrackerAndStatsCollection¶
getOptionalStatsTrackerAndStatsCollection(
  output: Seq[Attribute],
  outputPath: Path,
  partitionSchema: StructType, data: DataFrame): (Option[DeltaJobStatisticsTracker], Option[StatisticsCollection])
Noop with spark.databricks.delta.stats.collect disabled
getOptionalStatsTrackerAndStatsCollection returns neither DeltaJobStatisticsTracker nor StatisticsCollection with spark.databricks.delta.stats.collect disabled.
getOptionalStatsTrackerAndStatsCollection getStatsSchema (for the given output and partitionSchema).
getOptionalStatsTrackerAndStatsCollection reads the value of delta.dataSkippingNumIndexedCols table property (from the Metadata).
getOptionalStatsTrackerAndStatsCollection creates a StatisticsCollection (with the tableDataSchema based on spark.databricks.delta.stats.collect.using.tableSchema configuration property).
getOptionalStatsTrackerAndStatsCollection getStatsColExpr for the statsDataSchema and the StatisticsCollection.
In the end, getOptionalStatsTrackerAndStatsCollection creates a DeltaJobStatisticsTracker and the StatisticsCollection.
getStatsColExpr¶
getStatsColExpr(
  statsDataSchema: Seq[Attribute],
  statsCollection: StatisticsCollection): Expression
getStatsColExpr creates a Dataset for a LocalRelation (Spark SQL) logical operator with the given statsDataSchema.
getStatsColExpr uses Dataset.select to execute to_json standard function with statsCollector column.
In the end, getStatsColExpr takes the first Expression (from the expressions) in the analyzed logical query plan.
Creating FileCommitProtocol Committer¶
getCommitter(
  outputPath: Path): DelayedCommitProtocol
getCommitter creates a new DelayedCommitProtocol with the delta job ID and the given outputPath (and no random prefix length).
Note
The DelayedCommitProtocol is used for FileFormatWriter (Spark SQL) to write data out and, in the end, for the addedStatuses and changeFiles.
getCommitter is used when:
- TransactionalWriteis requested to write data out
getPartitioningColumns¶
getPartitioningColumns(
  partitionSchema: StructType,
  output: Seq[Attribute],
  colsDropped: Boolean): Seq[Attribute]
getPartitioningColumns...FIXME
normalizeData¶
normalizeData(
  deltaLog: DeltaLog,
  data: Dataset[_]): (QueryExecution, Seq[Attribute], Seq[Constraint], Set[String])
normalizeData normalizes the column names (using the table schema of the Metadata and the given data).
normalizeData tableHasDefaultExpr (using the Protocol and the Metadata).
normalizeData...FIXME
normalizeData is used when:
- TransactionalWriteis requested to write data out
makeOutputNullable¶
makeOutputNullable(
  output: Seq[Attribute]): Seq[Attribute]
makeOutputNullable...FIXME
performCDCPartition¶
performCDCPartition(
  inputData: Dataset[_]): (DataFrame, StructType)
performCDCPartition returns the input inputData with or without __is_cdc extra partition column based on whether Change Data Feed is enabled for the table and _change_type column is available in the schema of the given inputData or not.
The value of the __is_cdc extra column is as follows:
- truefor non-null- _change_types
- falseotherwise
The schema (the StructType of the tuple to be returned) includes the __is_cdc extra column as the first column (followed by the physicalPartitionSchema).