Skip to content

DelayedCommitProtocol

DelayedCommitProtocol is a FileCommitProtocol (Apache Spark) to write out data to a directory and return the files added.

DelayedCommitProtocol is used to model a distributed write that is orchestrated by the Spark driver with the write itself happening on executors.

Note

FileCommitProtocol allows to track a write job (with a write task per partition) and inform the driver when all the write tasks finished successfully (and were committed) to consider the write job completed.

TaskCommitMessage (Spark Core) allows to "transfer" the file names added (written out) on the executors to the driver for the optimistic transactional writer.

Creating Instance

DelayedCommitProtocol takes the following to be created:

DelayedCommitProtocol is created when:

Job ID

DelayedCommitProtocol is given a job ID that is always delta.

Unused

The Job ID does not seem to be used.

Data Path

DelayedCommitProtocol is given a path when created.

The path is the data directory of a delta table (this DelayedCommitProtocol coordinates a write process to)

Length of Random Prefix

DelayedCommitProtocol can be given a randomPrefixLength when created.

The randomPrefixLength is always undefined (None).

Change Data Feed Partition Handling

DelayedCommitProtocol defines 3 values to support Change Data Feed:

  • __is_cdc=false
  • __is_cdc=true
  • A Regex to match on __is_cdc=true text

DelayedCommitProtocol uses them for newTaskTempFile (to create temporary files in _change_data directory instead based on the regular expression).

Added Files (on Executors)

addedFiles: ArrayBuffer[(Map[String, String], String)]

DelayedCommitProtocol uses addedFiles internal registry to track the partition values (if writing happened to a partitioned table) and the relative paths of the files that were added by a write task.

addedFiles is used on executors only.

addedFiles is initialized (as an empty collection) when setting up a task.


addedFiles is used when:

  • DelayedCommitProtocol is requested to commit a task (on an executor and create a TaskCommitMessage with the files added while a task was writing data out)

Added Statuses (on Driver)

addedStatuses: ArrayBuffer[AddFile]

DelayedCommitProtocol uses addedStatuses internal registry to track the AddFile files added by write tasks (on executors) once all they finish successfully and the write job is committed (on a driver).

addedStatuses is used on the driver only.

addedStatuses is used when:

AddCDCFiles

changeFiles: ArrayBuffer[AddCDCFile]

DelayedCommitProtocol uses changeFiles internal registry to track the AddCDCFile files added by write tasks (on executors) once all they finish successfully and the write job is committed (on a driver).

changeFiles is used on the driver only.

changeFiles is used when:

Committing Job

Signature
commitJob(
  jobContext: JobContext,
  taskCommits: Seq[TaskCommitMessage]): Unit

commitJob is part of the FileCommitProtocol (Apache Spark) abstraction.

commitJob partitions the given TaskCommitMessages into a collection of AddFiles and AddCDCFiles.

In the end, commitJob adds the AddFiles to addedStatuses registry while the AddCDCFiles to the changeFiles.

Setting Up Task

FileCommitProtocol
setupTask(
  taskContext: TaskAttemptContext): Unit

setupTask is part of the FileCommitProtocol (Apache Spark) abstraction.

setupTask initializes the addedFiles internal registry to be empty.

New Task Temp File

FileCommitProtocol
newTaskTempFile(
  taskContext: TaskAttemptContext,
  dir: Option[String],
  ext: String): String

newTaskTempFile is part of the FileCommitProtocol (Apache Spark) abstraction.

Note

The given dir defines a partition directory if a query writes out to a partitioned table.

newTaskTempFile parses the partition values in the given dir, if available, or assumes no partition values (partitionValues).

newTaskTempFile creates a file name to write data out (for the given TaskAttemptContext, extension and the partition values).

Change Data Feed

While creating a file name, DelayedCommitProtocol uses the __is_cdc virtual column name to split (divide) CDF data from the main table data. If the __is_cdc column contains true, the file name uses cdc- prefix (not part-).

newTaskTempFile builds a relative directory path (using the randomPrefixLength or the optional dir if either is defined).

randomPrefixLength always undefined

randomPrefixLength is always undefined (None) so we can safely skip this branch.

  • For the directory to be exactly __is_cdc=false, newTaskTempFile returns the file name (with no further changes).

  • For the directory with the __is_cdc=true path prefix, newTaskTempFile replaces the prefix with _change_data and uses the changed directory as the parent of the file name.

    val subDir = "__is_cdc=true/a/b/c"
    
    val cdcPartitionTrue = "__is_cdc=true"
    val cdcPartitionTrueRegex = cdcPartitionTrue.r
    val path = cdcPartitionTrueRegex.replaceFirstIn(subDir, "_change_data")
    
    assert(path == "_change_data/a/b/c")
    

    Change Data Feed

    This is when DelayedCommitProtocol "redirects" writing out the __is_cdc=true-partitioned CDF data files to _change_data directory.

  • For the directory with the __is_cdc=false path prefix, newTaskTempFile removes the prefix and uses the changed directory as the parent of the file name.

  • For other cases, newTaskTempFile uses the directory as the parent of the file name.

When neither the randomPrefixLength nor the partition directory (dir) is defined, newTaskTempFile uses the file name (with no further changes).

newTaskTempFile adds the partition values and the relative path to the addedFiles internal registry.

In the end, newTaskTempFile returns the absolute path of the (relative) path in the directory.

File Name

getFileName(
  taskContext: TaskAttemptContext,
  ext: String,
  partitionValues: Map[String, String]): String

getFileName returns a file name of the format:

[prefix]-[split]-[uuid][ext]

The file name is created as follows:

  1. The prefix part is one of the following:
    • cdc for the given partitionValues with the __is_cdc partition column with true value
    • part otherwise
  2. The split part is the task ID from the given TaskAttemptContext (Apache Hadoop)
  3. The uuid part is a random UUID

Committing Task

FileCommitProtocol
commitTask(
  taskContext: TaskAttemptContext): TaskCommitMessage

commitTask is part of the FileCommitProtocol (Apache Spark) abstraction.

commitTask creates a TaskCommitMessage with a FileAction (a AddCDCFile or a AddFile based on __is_cdc virtual partition column) for every file added (if there were any added successfully). Otherwise, commitTask creates an empty TaskCommitMessage.

Note

A file is added (to the addedFiles internal registry) when DelayedCommitProtocol is requested for a new file (path).

buildActionFromAddedFile

buildActionFromAddedFile(
  f: (Map[String, String], String),
  stat: FileStatus,
  taskContext: TaskAttemptContext): FileAction

Note

f argument is a pair of the partition values and one of the file added.

buildActionFromAddedFile removes the __is_cdc virtual partition column and creates a FileAction:

Aborting Job

FileCommitProtocol
abortJob(
  jobContext: JobContext): Unit

abortJob is part of the FileCommitProtocol (Apache Spark) abstraction.

abortJob does nothing (is a noop).

Aborting Task

FileCommitProtocol
abortTask(
  taskContext: TaskAttemptContext): Unit

abortTask is part of the FileCommitProtocol (Apache Spark) abstraction.

abortTask does nothing (is a noop).

Setting Up Job

FileCommitProtocol
setupJob(
  jobContext: JobContext): Unit

setupJob is part of the FileCommitProtocol (Apache Spark) abstraction.

setupJob does nothing (is a noop).

New Temp File (Absolute Path)

FileCommitProtocol
newTaskTempFileAbsPath(
  taskContext: TaskAttemptContext,
  absoluteDir: String,
  ext: String): String

newTaskTempFileAbsPath is part of the FileCommitProtocol (Apache Spark) abstraction.

newTaskTempFileAbsPath throws an UnsupportedOperationException:

[this] does not support adding files with an absolute path

Logging

Enable ALL logging level for org.apache.spark.sql.delta.files.DelayedCommitProtocol logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.files.DelayedCommitProtocol=ALL

Refer to Logging.