Skip to content

TransactionalWrite

TransactionalWrite is an abstraction of optimistic transactional writers that can write a structured query out to a Delta table.

Contract

DeltaLog

deltaLog: DeltaLog

DeltaLog (of a delta table) that this transaction is changing

Used when:

Metadata

metadata: Metadata

Metadata (of the delta table) that this transaction is changing

Protocol

protocol: Protocol

Protocol (of the delta table) that this transaction is changing

Used when:

Snapshot

snapshot: Snapshot

Snapshot (of the delta table) that this transaction is reading at

Implementations

spark.databricks.delta.history.metricsEnabled

With spark.databricks.delta.history.metricsEnabled configuration property enabled, TransactionalWrite creates a BasicWriteJobStatsTracker (Spark SQL) and registers SQL metrics (when requested to write data out).

hasWritten Flag

hasWritten: Boolean = false

TransactionalWrite uses hasWritten internal registry to prevent OptimisticTransactionImpl from updating metadata after writting data out.

hasWritten is initially false and changes to true after data is written out.

Writing Data Out

writeFiles(
  data: Dataset[_]): Seq[FileAction]  // (1)!
writeFiles(
  data: Dataset[_],
  writeOptions: Option[DeltaOptions]): Seq[FileAction]  // (3)!
writeFiles(
  inputData: Dataset[_],
  writeOptions: Option[DeltaOptions],
  additionalConstraints: Seq[Constraint]): Seq[FileAction]
writeFiles(
  data: Dataset[_],
  additionalConstraints: Seq[Constraint]): Seq[FileAction]  // (2)!
  1. Uses no Constraints
  2. Uses no write-related DeltaOptions
  3. Uses no Constraints

writeFiles writes the given data (as a Dataset) to a delta table and returns AddFiles with AddCDCFiles (from the DelayedCommitProtocol).


writeFiles is used when:


writeFiles creates a DeltaInvariantCheckerExec and a DelayedCommitProtocol to write out files to the data path (of the DeltaLog).

FileFormatWriter

writeFiles uses FileFormatWriter (Spark SQL) utility to write out a result of a streaming query.

writeFiles is executed within SQLExecution.withNewExecutionId.

SQLAppStatusListener

writeFiles can be tracked using web UI or SQLAppStatusListener (using SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events).

Learn about SQLAppStatusListener in The Internals of Spark SQL online book.

In the end, writeFiles returns the addedStatuses of the DelayedCommitProtocol committer.

Step 1. Mark Write Executed

Even though it is so early, writeFiles turns the hasWritten flag on (true).

Note

After writeFiles, no metadata updates in the transaction are permitted.

writeFiles performs CDC augmentation (for the delta table with Change Data Feed enabled).

writeFiles normalizes the output dataset (based on the given DeltaOptions).

Step 2. Partitioning Columns

writeFiles determines the partitioning columns of the data(set) to be written out.

Step 3. DelayedCommitProtocol Committer

writeFiles creates a DelayedCommitProtocol committer for the data path of the delta table.

Step 4. DeltaJobStatisticsTracker

writeFiles may or may not create a DeltaJobStatisticsTracker based on stats.collect configuration property.

Step 5. Constraints

writeFiles collects Constraints:

  1. From the table metadata (CHECK constraints and Column Invariants)
  2. Generated Columns (from normalization)
  3. The given additional Constraints

Step 6. New deltaTransactionalWrite Execution ID

writeFiles requests a new execution ID (Spark SQL) with deltaTransactionalWrite name to execute a Spark write job.

FileFormatWriter

Delta Lake uses Spark SQL infrastructure to write data out.

Step 6.1 No Custom Partitioning

writeFiles uses a FileFormatWriter.OutputSpec with no custom partition locations.

Step 6.2 DeltaInvariantCheckerExec

writeFiles creates a DeltaInvariantCheckerExec unary physical operator (with the executed plan of the normalized query execution as the child operator and the constraints).

Note

The DeltaInvariantCheckerExec physical operator is later used as the physical plan to for the data to be written out.

Step 6.3 BasicWriteJobStatsTracker

writeFiles may or may not create a BasicWriteJobStatsTracker (Spark SQL) based on history.metricsEnabled configuration property.

With history.metricsEnabled enabled (and BasicWriteJobStatsTracker created), writeFiles registers the following metrics to be collected (tracked):

Metric Name UI Description
numFiles number of written files
numOutputBytes written output
numOutputRows number of output rows
numParts number of dynamic part
jobCommitTime job commit time

Step 6.4 Write Options

writeFiles makes sure (filters out) that there are only the following write options used (from the given writeOptions), if specified:

Step 6.5 FileFormatWriter

As the very last step within the scope of the new execution ID, writeFiles writes out the data (using Spark SQL infrastructure).

Logging

Enable ALL logging level for org.apache.spark.sql.execution.datasources.FileFormatWriter logger to see what happens inside.

writeFiles uses the following (among the others):

Step 7. AddFiles and AddCDCFiles

In the end, writeFiles returns AddFiles and AddCDCFiles (from the DelayedCommitProtocol).

getOptionalStatsTrackerAndStatsCollection

getOptionalStatsTrackerAndStatsCollection(
  output: Seq[Attribute],
  outputPath: Path,
  partitionSchema: StructType, data: DataFrame): (Option[DeltaJobStatisticsTracker], Option[StatisticsCollection])
Noop with spark.databricks.delta.stats.collect disabled

getOptionalStatsTrackerAndStatsCollection returns neither DeltaJobStatisticsTracker nor StatisticsCollection with spark.databricks.delta.stats.collect disabled.

getOptionalStatsTrackerAndStatsCollection getStatsSchema (for the given output and partitionSchema).

getOptionalStatsTrackerAndStatsCollection reads the value of delta.dataSkippingNumIndexedCols table property (from the Metadata).

getOptionalStatsTrackerAndStatsCollection creates a StatisticsCollection (with the tableDataSchema based on spark.databricks.delta.stats.collect.using.tableSchema configuration property).

getOptionalStatsTrackerAndStatsCollection getStatsColExpr for the statsDataSchema and the StatisticsCollection.

In the end, getOptionalStatsTrackerAndStatsCollection creates a DeltaJobStatisticsTracker and the StatisticsCollection.

getStatsColExpr

getStatsColExpr(
  statsDataSchema: Seq[Attribute],
  statsCollection: StatisticsCollection): Expression

getStatsColExpr creates a Dataset for a LocalRelation (Spark SQL) logical operator with the given statsDataSchema.

getStatsColExpr uses Dataset.select to execute to_json standard function with statsCollector column.

In the end, getStatsColExpr takes the first Expression (from the expressions) in the analyzed logical query plan.

Creating FileCommitProtocol Committer

getCommitter(
  outputPath: Path): DelayedCommitProtocol

getCommitter creates a new DelayedCommitProtocol with the delta job ID and the given outputPath (and no random prefix length).

Note

The DelayedCommitProtocol is used for FileFormatWriter (Spark SQL) to write data out and, in the end, for the addedStatuses and changeFiles.

getCommitter is used when:

getPartitioningColumns

getPartitioningColumns(
  partitionSchema: StructType,
  output: Seq[Attribute],
  colsDropped: Boolean): Seq[Attribute]

getPartitioningColumns...FIXME

normalizeData

normalizeData(
  deltaLog: DeltaLog,
  data: Dataset[_]): (QueryExecution, Seq[Attribute], Seq[Constraint], Set[String])

normalizeData normalizes the column names (using the table schema of the Metadata and the given data).

normalizeData tableHasDefaultExpr (using the Protocol and the Metadata).

normalizeData...FIXME


normalizeData is used when:

makeOutputNullable

makeOutputNullable(
  output: Seq[Attribute]): Seq[Attribute]

makeOutputNullable...FIXME

performCDCPartition

performCDCPartition(
  inputData: Dataset[_]): (DataFrame, StructType)

performCDCPartition returns the input inputData with or without __is_cdc extra partition column based on whether Change Data Feed is enabled for the table and _change_type column is available in the schema of the given inputData or not.

The value of the __is_cdc extra column is as follows:

  • true for non-null _change_types
  • false otherwise

The schema (the StructType of the tuple to be returned) includes the __is_cdc extra column as the first column (followed by the physicalPartitionSchema).