Skip to content

TransactionalWrite

TransactionalWrite is an abstraction of optimistic transactional writers that can write a structured query out to a Delta table.

Contract

DeltaLog

deltaLog: DeltaLog

DeltaLog (of a delta table) that this transaction is changing

Used when:

Metadata

metadata: Metadata

Metadata (of the delta table) that this transaction is changing

Protocol

protocol: Protocol

Protocol (of the delta table) that this transaction is changing

Used when:

Snapshot

snapshot: Snapshot

Snapshot (of the delta table) that this transaction is reading at

Implementations

spark.databricks.delta.history.metricsEnabled

With spark.databricks.delta.history.metricsEnabled configuration property enabled, TransactionalWrite creates a BasicWriteJobStatsTracker (Spark SQL) and registers SQL metrics (when requested to write data out).

hasWritten Flag

hasWritten: Boolean = false

TransactionalWrite uses hasWritten internal registry to prevent OptimisticTransactionImpl from updating metadata after having written out files.

hasWritten is initially false and changes to true after having data written out.

Writing Data Out

writeFiles(
  data: Dataset[_]): Seq[FileAction]  // (1)!
writeFiles(
  data: Dataset[_],
  writeOptions: Option[DeltaOptions]): Seq[FileAction]
writeFiles(
  inputData: Dataset[_],
  writeOptions: Option[DeltaOptions],
  additionalConstraints: Seq[Constraint]): Seq[FileAction]
writeFiles(
  data: Dataset[_],
  additionalConstraints: Seq[Constraint]): Seq[FileAction]  // (2)!
  1. Uses no additionalConstraints
  2. Uses no writeOptions

writeFiles writes the given data to a delta table and returns AddFiles with AddCDCFiles (from the DelayedCommitProtocol).


writeFiles is used when:


writeFiles creates a DeltaInvariantCheckerExec and a DelayedCommitProtocol to write out files to the data path (of the DeltaLog).

FileFormatWriter

writeFiles uses Spark SQL's FileFormatWriter utility to write out a result of a streaming query.

Learn about FileFormatWriter in The Internals of Spark SQL online book.

writeFiles is executed within SQLExecution.withNewExecutionId.

SQLAppStatusListener

writeFiles can be tracked using web UI or SQLAppStatusListener (using SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events).

Learn about SQLAppStatusListener in The Internals of Spark SQL online book.

In the end, writeFiles returns the addedStatuses of the DelayedCommitProtocol committer.


Internally, writeFiles turns the hasWritten flag on (true).

Note

After writeFiles, no metadata updates in the transaction are permitted.

writeFiles performCDCPartition (into a possibly-augmented CDF-aware DataFrame and a corresponding schema with an additional CDF-aware __is_cdc column).

writeFiles normalize the (possibly-augmented CDF-aware) DataFrame.

writeFiles gets the partitioning columns based on the (possibly-augmented CDF-aware) partition schema.

DelayedCommitProtocol Committer

writeFiles creates a DelayedCommitProtocol committer for the data path (of the DeltaLog).

DeltaJobStatisticsTracker

writeFiles creates a DeltaJobStatisticsTracker if spark.databricks.delta.stats.collect configuration property is enabled.

Constraints

writeFiles collects constraints:

  1. From the table metadata (CHECK constraints and Column Invariants)
  2. Generated columns (after normalizeData)
  3. The given additionalConstraints

deltaTransactionalWrite Execution ID

writeFiles requests a new Execution ID (that is used to track all Spark jobs of FileFormatWriter.write in Spark SQL) with the physical query plan after normalizeData and deltaTransactionalWrite name.

DeltaInvariantCheckerExec

writeFiles creates a DeltaInvariantCheckerExec unary physical operator (with the executed plan of the normalized query execution as the child operator).

BasicWriteJobStatsTracker

writeFiles creates a BasicWriteJobStatsTracker (Spark SQL) if spark.databricks.delta.history.metricsEnabled configuration property is enabled.

Write Options

writeFiles filters out all the write options (from the given writeOptions) except the following:

  1. maxRecordsPerFile
  2. compression

FileFormatWriter

As the last step under the new execution ID writeFiles writes out the data (using FileFormatWriter).

Tip

Enable ALL logging level for org.apache.spark.sql.execution.datasources.FileFormatWriter logger to see what happens inside.

AddFiles and AddCDCFiles

In the end, writeFiles returns AddFiles and AddCDCFiles (from the DelayedCommitProtocol).

Creating FileCommitProtocol Committer

getCommitter(
  outputPath: Path): DelayedCommitProtocol

getCommitter creates a new DelayedCommitProtocol with the delta job ID and the given outputPath (and no random prefix length).

Note

The DelayedCommitProtocol is used for FileFormatWriter (Spark SQL) to write data out and, in the end, for the addedStatuses and changeFiles.

getCommitter is used when:

getPartitioningColumns

getPartitioningColumns(
  partitionSchema: StructType,
  output: Seq[Attribute],
  colsDropped: Boolean): Seq[Attribute]

getPartitioningColumns...FIXME

normalizeData

normalizeData(
  deltaLog: DeltaLog,
  data: Dataset[_]): (QueryExecution, Seq[Attribute], Seq[Constraint], Set[String])

normalizeData normalizes the column names (using the table schema of the Metadata and the given data).

normalizeData tableHasDefaultExpr (using the Protocol and the Metadata).

normalizeData...FIXME


normalizeData is used when:

makeOutputNullable

makeOutputNullable(
  output: Seq[Attribute]): Seq[Attribute]

makeOutputNullable...FIXME

performCDCPartition

performCDCPartition(
  inputData: Dataset[_]): (DataFrame, StructType)

performCDCPartition returns the input inputData with or without __is_cdc extra column based on whether Change Data Feed is enabled for the table and _change_type column is available in the schema of the given inputData or not.

The value of the __is_cdc extra column is as follows:

  • true for non-null _change_types
  • false otherwise

The schema (the StructType of the tuple to be returned) includes the __is_cdc extra column as the first column (followed by the physicalPartitionSchema).