Skip to content

DelayedCommitProtocol

DelayedCommitProtocol is a FileCommitProtocol (Apache Spark) to write out data to a directory and return the files added.

DelayedCommitProtocol is used to model a distributed write that is orchestrated by the Spark driver with the write itself happening on executors.

Note

FileCommitProtocol allows to track a write job (with a write task per partition) and inform the driver when all the write tasks finished successfully (and were committed) to consider the write job completed.

TaskCommitMessage (Spark Core) allows to "transfer" the file names added (written out) on the executors to the driver for the optimistic transactional writer.

Creating Instance

DelayedCommitProtocol takes the following to be created:

DelayedCommitProtocol is created when:

Job ID

DelayedCommitProtocol is given a job ID that is always delta.

Data Path

DelayedCommitProtocol is given a path when created.

The path is the data directory of a delta table (this DelayedCommitProtocol coordinates a write process to)

Length of Random Prefix

DelayedCommitProtocol can be given a randomPrefixLength when created.

The randomPrefixLength is always undefined (None).

Change Data Feed Partition Handling

DelayedCommitProtocol defines 3 values to support Change Data Feed:

  • __is_cdc=false
  • __is_cdc=true
  • A Regex to match on __is_cdc=true text

DelayedCommitProtocol uses them for newTaskTempFile (to create temporary files in _change_data directory instead based on the regular expression).

addedFiles

addedFiles: ArrayBuffer[(Map[String, String], String)]

DelayedCommitProtocol uses addedFiles internal registry to track the partition values (if writing happened to a partitioned table) and the relative paths of the files that were added by a write task.

addedFiles is used on executors only.

addedFiles is initialized (as an empty collection) when setting up a task.

addedFiles is used when:

  • DelayedCommitProtocol is requested to commit a task (on an executor and create a TaskCommitMessage with the files added while a task was writing data out)

AddFiles

addedStatuses: ArrayBuffer[AddFile]

DelayedCommitProtocol uses addedStatuses internal registry to track the AddFile files added by write tasks (on executors) once all they finish successfully and the write job is committed (on a driver).

addedStatuses is used on the driver only.

addedStatuses is used when:

AddCDCFiles

changeFiles: ArrayBuffer[AddCDCFile]

DelayedCommitProtocol uses changeFiles internal registry to track the AddCDCFile files added by write tasks (on executors) once all they finish successfully and the write job is committed (on a driver).

changeFiles is used on the driver only.

changeFiles is used when:

Setting Up Job

setupJob(
  jobContext: JobContext): Unit

setupJob is part of the FileCommitProtocol (Apache Spark) abstraction.

setupJob is a noop.

Committing Job

commitJob(
  jobContext: JobContext,
  taskCommits: Seq[TaskCommitMessage]): Unit

commitJob is part of the FileCommitProtocol (Apache Spark) abstraction.


commitJob partitions the given TaskCommitMessages into a collection of AddFiles and AddCDCFiles.

In the end, commitJob adds the AddFiles to addedStatuses registry while the AddCDCFiles to the changeFiles.

Aborting Job

abortJob(
  jobContext: JobContext): Unit

abortJob is part of the FileCommitProtocol (Apache Spark) abstraction.

abortJob is a noop.

Setting Up Task

setupTask(
  taskContext: TaskAttemptContext): Unit

setupTask is part of the FileCommitProtocol (Apache Spark) abstraction.

setupTask initializes the addedFiles internal registry to be empty.

New Temp File

newTaskTempFile(
  taskContext: TaskAttemptContext,
  dir: Option[String],
  ext: String): String

newTaskTempFile is part of the FileCommitProtocol (Apache Spark) abstraction.


Note

The given dir defines a partition directory if a query is written out to a partitioned table.

newTaskTempFile parses the partition values out of the given dir or falls back to an empty partitionValues.

newTaskTempFile creates a file name (for the given TaskAttemptContext, ext and the partition values).

newTaskTempFile builds a relative directory path (using the randomPrefixLength or the optional dir if either is defined).

randomPrefixLength always undefined

randomPrefixLength is always undefined (None) so we can safely skip this branch.

  • For the directory to be exactly __is_cdc=false, newTaskTempFile returns the file name (with no further changes).

  • For the directory with the __is_cdc=true path prefix, newTaskTempFile replaces the prefix with _change_data and uses the changed directory as the parent of the file name.

    val subDir = "__is_cdc=true/a/b/c"
    
    val cdcPartitionTrue = "__is_cdc=true"
    val cdcPartitionTrueRegex = cdcPartitionTrue.r
    val path = cdcPartitionTrueRegex.replaceFirstIn(subDir, "_change_data")
    
    assert(path == "_change_data/a/b/c")
    
  • For the directory with the __is_cdc=false path prefix, newTaskTempFile removes the prefix and uses the changed directory as the parent of the file name.

  • For other cases, newTaskTempFile uses the directory as the parent of the file name.

When neither the randomPrefixLength nor the partition directory (dir) is defined, newTaskTempFile uses the file name (with no further changes).

newTaskTempFile adds the partition values and the relative path to the addedFiles internal registry.

In the end, newTaskTempFile returns the absolute path of the (relative) path in the directory.

File Name

getFileName(
  taskContext: TaskAttemptContext,
  ext: String,
  partitionValues: Map[String, String]): String

getFileName returns a file name of the format:

[prefix]-[split]-[uuid][ext]

The file name is created as follows:

  1. The prefix part is one of the following:
    • cdc for the given partitionValues with the __is_cdc partition column with true value
    • part otherwise
  2. The split part is the task ID from the given TaskAttemptContext (Apache Hadoop)
  3. The uuid part is a random UUID

New Temp File (Absolute Path)

newTaskTempFileAbsPath(
  taskContext: TaskAttemptContext,
  absoluteDir: String,
  ext: String): String

newTaskTempFileAbsPath is part of the FileCommitProtocol (Apache Spark) abstraction.

newTaskTempFileAbsPath throws an UnsupportedOperationException:

[this] does not support adding files with an absolute path

Committing Task

commitTask(
  taskContext: TaskAttemptContext): TaskCommitMessage

commitTask is part of the FileCommitProtocol (Apache Spark) abstraction.


commitTask creates a TaskCommitMessage with a FileAction (a AddCDCFile or a AddFile) for every file added (if there were any added successfully). Otherwise, commitTask creates an empty TaskCommitMessage.

Note

A file is added (to the addedFiles internal registry) when DelayedCommitProtocol is requested for a new file (path).

buildActionFromAddedFile

buildActionFromAddedFile(
  f: (Map[String, String], String),
  stat: FileStatus,
  taskContext: TaskAttemptContext): FileAction

buildActionFromAddedFile removes the __is_cdc virtual partition column and creates a FileAction:

Aborting Task

abortTask(
  taskContext: TaskAttemptContext): Unit

abortTask is part of the FileCommitProtocol (Apache Spark) abstraction.

abortTask is a noop.

Logging

Enable ALL logging level for org.apache.spark.sql.delta.files.DelayedCommitProtocol logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.files.DelayedCommitProtocol=ALL

Refer to Logging.