Skip to content

TransactionalWrite

TransactionalWrite is an abstraction of optimistic transactional writers that can write a structured query out to a Delta table.

Contract

DeltaLog

deltaLog: DeltaLog

DeltaLog (of a delta table) that this transaction is changing

Used when:

Metadata

metadata: Metadata

Metadata (of the delta table) that this transaction is changing

Protocol

protocol: Protocol

Protocol (of the delta table) that this transaction is changing

Used when:

Snapshot

snapshot: Snapshot

Snapshot (of the delta table) that this transaction is reading at

Implementations

spark.databricks.delta.history.metricsEnabled

With spark.databricks.delta.history.metricsEnabled configuration property enabled, TransactionalWrite creates a BasicWriteJobStatsTracker and registers SQL metrics (when requested to writeFiles).

hasWritten Flag

hasWritten: Boolean = false

TransactionalWrite uses hasWritten internal registry to prevent OptimisticTransactionImpl from updating metadata after having written out files.

hasWritten is initially false and changes to true after having written out files.

Writing Data Out (Result Of Structured Query)

writeFiles(
  data: Dataset[_]): Seq[FileAction]
writeFiles(
  data: Dataset[_],
  writeOptions: Option[DeltaOptions]): Seq[FileAction]

writeFiles creates a DeltaInvariantCheckerExec and a DelayedCommitProtocol to write out files to the data path (of the DeltaLog).

FileFormatWriter

writeFiles uses Spark SQL's FileFormatWriter utility to write out a result of a streaming query.

Learn about FileFormatWriter in The Internals of Spark SQL online book.

writeFiles is executed within SQLExecution.withNewExecutionId.

SQLAppStatusListener

writeFiles can be tracked using web UI or SQLAppStatusListener (using SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events).

Learn about SQLAppStatusListener in The Internals of Spark SQL online book.

In the end, writeFiles returns the addedStatuses of the DelayedCommitProtocol committer.

Internally, writeFiles turns the hasWritten flag on (true).

Note

After writeFiles, no metadata updates in the transaction are permitted.

writeFiles normalize the given data dataset (based on the partitionColumns of the Metadata).

writeFiles getPartitioningColumns based on the partitionSchema of the Metadata.

DelayedCommitProtocol Committer

writeFiles creates a DelayedCommitProtocol committer for the data path (of the DeltaLog).

Constraints

writeFiles collects constraintss from the table metadata and the generated columns.

DeltaInvariantCheckerExec

writeFiles requests a new Execution ID (that is used to track all Spark jobs of FileFormatWriter.write in Spark SQL) with a physical query plan of a new DeltaInvariantCheckerExec unary physical operator (with the executed plan of the normalized query execution as the child operator)

Creating Committer

getCommitter(
  outputPath: Path): DelayedCommitProtocol

getCommitter creates a new DelayedCommitProtocol with the delta job ID and the given outputPath (and no random prefix).

getPartitioningColumns

getPartitioningColumns(
  partitionSchema: StructType,
  output: Seq[Attribute],
  colsDropped: Boolean): Seq[Attribute]

getPartitioningColumns...FIXME

normalizeData

normalizeData(
  data: Dataset[_],
  partitionCols: Seq[String]): (QueryExecution, Seq[Attribute])

normalizeData...FIXME

makeOutputNullable

makeOutputNullable(
  output: Seq[Attribute]): Seq[Attribute]

makeOutputNullable...FIXME

Usage

writeFiles is used when:


Last update: 2021-06-29
Back to top