TransactionalWrite¶
TransactionalWrite
is an abstraction of optimistic transactional writers that can write a structured query out to a Delta table.
Contract¶
DeltaLog¶
deltaLog: DeltaLog
DeltaLog (of a delta table) that this transaction is changing
Used when:
OptimisticTransactionImpl
is requested to prepare a commit, doCommit, checkAndRetry, and perform post-commit operations (and execute delta log checkpoint)- ConvertToDeltaCommand is executed
DeltaCommand
is requested to buildBaseRelation and commitLarge- MergeIntoCommand is executed
TransactionalWrite
is requested to write a structured query out to a delta table- GenerateSymlinkManifest post-commit hook is executed
ImplicitMetadataOperation
is requested to updateMetadataDeltaSink
is requested to addBatch
Metadata¶
metadata: Metadata
Metadata (of the delta table) that this transaction is changing
Protocol¶
protocol: Protocol
Protocol (of the delta table) that this transaction is changing
Used when:
OptimisticTransactionImpl
is requested to updateMetadata, verifyNewMetadata and prepareCommit- ConvertToDeltaCommand is executed
Snapshot¶
snapshot: Snapshot
Snapshot (of the delta table) that this transaction is reading at
Implementations¶
spark.databricks.delta.history.metricsEnabled¶
With spark.databricks.delta.history.metricsEnabled configuration property enabled, TransactionalWrite
creates a BasicWriteJobStatsTracker
(Spark SQL) and registers SQL metrics (when requested to write data out).
hasWritten Flag¶
hasWritten: Boolean = false
TransactionalWrite
uses hasWritten
internal registry to prevent OptimisticTransactionImpl
from updating metadata after having written out files.
hasWritten
is initially false
and changes to true
after having data written out.
Writing Data Out¶
writeFiles(
data: Dataset[_]): Seq[FileAction] // (1)!
writeFiles(
data: Dataset[_],
writeOptions: Option[DeltaOptions]): Seq[FileAction]
writeFiles(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
additionalConstraints: Seq[Constraint]): Seq[FileAction]
writeFiles(
data: Dataset[_],
additionalConstraints: Seq[Constraint]): Seq[FileAction] // (2)!
- Uses no
additionalConstraints
- Uses no
writeOptions
writeFiles
writes the given data
to a delta table and returns AddFiles with AddCDCFiles (from the DelayedCommitProtocol).
writeFiles
is used when:
WriteIntoDelta
is requested to writeDeleteCommand
is requested to rewriteFilesMergeIntoCommand
is requested to writeInsertsOnlyWhenNoMatchedClauses and writeAllChangesOptimizeExecutor
is requested to runOptimizeBinJobUpdateCommand
is requested to rewriteFilesDeltaSink
is requested to add a streaming micro-batch
writeFiles
creates a DeltaInvariantCheckerExec and a DelayedCommitProtocol to write out files to the data path (of the DeltaLog).
FileFormatWriter
writeFiles
uses Spark SQL's FileFormatWriter
utility to write out a result of a streaming query.
Learn about FileFormatWriter in The Internals of Spark SQL online book.
writeFiles
is executed within SQLExecution.withNewExecutionId
.
SQLAppStatusListener
writeFiles
can be tracked using web UI or SQLAppStatusListener
(using SparkListenerSQLExecutionStart
and SparkListenerSQLExecutionEnd
events).
Learn about SQLAppStatusListener in The Internals of Spark SQL online book.
In the end, writeFiles
returns the addedStatuses of the DelayedCommitProtocol committer.
Internally, writeFiles
turns the hasWritten flag on (true
).
Note
After writeFiles
, no metadata updates in the transaction are permitted.
writeFiles
performCDCPartition (into a possibly-augmented CDF-aware DataFrame
and a corresponding schema with an additional CDF-aware __is_cdc column).
writeFiles
normalize the (possibly-augmented CDF-aware) DataFrame
.
writeFiles
gets the partitioning columns based on the (possibly-augmented CDF-aware) partition schema.
DelayedCommitProtocol Committer¶
writeFiles
creates a DelayedCommitProtocol committer for the data path (of the DeltaLog).
DeltaJobStatisticsTracker¶
writeFiles
creates a DeltaJobStatisticsTracker if spark.databricks.delta.stats.collect configuration property is enabled.
Constraints¶
writeFiles
collects constraints:
- From the table metadata (CHECK constraints and Column Invariants)
- Generated columns (after normalizeData)
- The given
additionalConstraints
deltaTransactionalWrite Execution ID¶
writeFiles
requests a new Execution ID (that is used to track all Spark jobs of FileFormatWriter.write
in Spark SQL) with the physical query plan after normalizeData and deltaTransactionalWrite
name.
DeltaInvariantCheckerExec¶
writeFiles
creates a DeltaInvariantCheckerExec unary physical operator (with the executed plan of the normalized query execution as the child operator).
BasicWriteJobStatsTracker¶
writeFiles
creates a BasicWriteJobStatsTracker
(Spark SQL) if spark.databricks.delta.history.metricsEnabled configuration property is enabled.
Write Options¶
writeFiles
filters out all the write options (from the given writeOptions
) except the following:
FileFormatWriter¶
As the last step under the new execution ID writeFiles
writes out the data (using FileFormatWriter).
Tip
Enable ALL
logging level for org.apache.spark.sql.execution.datasources.FileFormatWriter logger to see what happens inside.
AddFiles and AddCDCFiles¶
In the end, writeFiles
returns AddFiles and AddCDCFiles (from the DelayedCommitProtocol).
Creating FileCommitProtocol Committer¶
getCommitter(
outputPath: Path): DelayedCommitProtocol
getCommitter
creates a new DelayedCommitProtocol with the delta
job ID and the given outputPath
(and no random prefix length).
Note
The DelayedCommitProtocol is used for FileFormatWriter
(Spark SQL) to write data out and, in the end, for the addedStatuses and changeFiles.
getCommitter
is used when:
TransactionalWrite
is requested to write data out
getPartitioningColumns¶
getPartitioningColumns(
partitionSchema: StructType,
output: Seq[Attribute],
colsDropped: Boolean): Seq[Attribute]
getPartitioningColumns
...FIXME
normalizeData¶
normalizeData(
deltaLog: DeltaLog,
data: Dataset[_]): (QueryExecution, Seq[Attribute], Seq[Constraint], Set[String])
normalizeData
normalizes the column names (using the table schema of the Metadata and the given data
).
normalizeData
tableHasDefaultExpr (using the Protocol and the Metadata).
normalizeData
...FIXME
normalizeData
is used when:
TransactionalWrite
is requested to write data out
makeOutputNullable¶
makeOutputNullable(
output: Seq[Attribute]): Seq[Attribute]
makeOutputNullable
...FIXME
performCDCPartition¶
performCDCPartition(
inputData: Dataset[_]): (DataFrame, StructType)
performCDCPartition
returns the input inputData
with or without __is_cdc extra column based on whether Change Data Feed is enabled for the table and _change_type column is available in the schema of the given inputData
or not.
The value of the __is_cdc extra column is as follows:
true
for non-null_change_type
sfalse
otherwise
The schema (the StructType
of the tuple to be returned) includes the __is_cdc extra column as the first column (followed by the physicalPartitionSchema).