Skip to content

DelayedCommitProtocol

DelayedCommitProtocol is a FileCommitProtocol (Apache Spark) to write out data to a directory and return the files added.

DelayedCommitProtocol is used to model a distributed write that is orchestrated by the Spark driver with the write itself happening on executors.

Note

FileCommitProtocol allows to track a write job (with a write task per partition) and inform the driver when all the write tasks finished successfully (and were committed) to consider the write job completed. TaskCommitMessage (Spark Core) allows to "transfer" the files added (written out) on the executors to the driver for the optimistic transactional writer.

DelayedCommitProtocol is a Serializable.

Creating Instance

DelayedCommitProtocol takes the following to be created:

  • Job ID
  • Path (to write files to)
  • (optional) Length of Random Prefix

DelayedCommitProtocol is created when:

addedFiles

addedFiles: ArrayBuffer[(Map[String, String], String)]

DelayedCommitProtocol uses addedFiles internal registry to track the files added by a Spark write task.

addedFiles is used on the executors only.

addedFiles is initialized (as an empty collection) when setting up a task.

addedFiles is used when:

  • DelayedCommitProtocol is requested to commit a task (on an executor and create a TaskCommitMessage with the files added while a task was writing out a partition of a streaming query)

addedStatuses

addedStatuses: ArrayBuffer[AddFile]

DelayedCommitProtocol uses addedStatuses internal registry to track the files that were added by write tasks (on executors) once all they finish successfully and the write job is committed (on a driver).

addedStatuses is used on the driver only.

addedStatuses is used when:

Setting Up Job

setupJob(
  jobContext: JobContext): Unit

setupJob is part of the FileCommitProtocol (Apache Spark) abstraction.

setupJob is a noop.

Committing Job

commitJob(
  jobContext: JobContext,
  taskCommits: Seq[TaskCommitMessage]): Unit

commitJob is part of the FileCommitProtocol (Apache Spark) abstraction.

commitJob adds the AddFiles (from the given taskCommits from every commitTask) to the addedStatuses internal registry.

Aborting Job

abortJob(
  jobContext: JobContext): Unit

abortJob is part of the FileCommitProtocol (Apache Spark) abstraction.

abortJob is a noop.

Setting Up Task

setupTask(
  taskContext: TaskAttemptContext): Unit

setupTask is part of the FileCommitProtocol (Apache Spark) abstraction.

setupTask initializes the addedFiles internal registry to be empty.

New Temp File (Relative Path)

newTaskTempFile(
  taskContext: TaskAttemptContext,
  dir: Option[String],
  ext: String): String

newTaskTempFile is part of the FileCommitProtocol (Apache Spark) abstraction.

newTaskTempFile creates a file name for the given TaskAttemptContext and ext.

newTaskTempFile tries to parsePartitions with the given dir or falls back to an empty partitionValues.

Note

The given dir defines a partition directory if the streaming query (and hence the write) is partitioned.

newTaskTempFile builds a path (based on the given randomPrefixLength and the dir, or uses the file name directly).

Fixme

When are the optional dir and the randomPrefixLength defined?

newTaskTempFile adds the partition values and the relative path to the addedFiles internal registry.

In the end, newTaskTempFile returns the absolute path of the (relative) path in the directory.

File Name

getFileName(
  taskContext: TaskAttemptContext,
  ext: String,
  partitionValues: Map[String, String]): String

getFileName takes the task ID from the given TaskAttemptContext (Apache Spark) (for the split part below).

getFileName generates a random UUID (for the uuid part below).

In the end, getFileName returns a file name of the format:

part-[split]-[uuid][ext]

New Temp File (Absolute Path)

newTaskTempFileAbsPath(
  taskContext: TaskAttemptContext,
  absoluteDir: String,
  ext: String): String

newTaskTempFileAbsPath is part of the FileCommitProtocol (Apache Spark) abstraction.

newTaskTempFileAbsPath throws an UnsupportedOperationException:

[this] does not support adding files with an absolute path

Committing Task

commitTask(
  taskContext: TaskAttemptContext): TaskCommitMessage

commitTask is part of the FileCommitProtocol (Apache Spark) abstraction.

commitTask creates a TaskCommitMessage with an AddFile for every file added if there are any. Otherwise, commitTask creates an empty TaskCommitMessage.

Note

A file is added (to the addedFiles internal registry) when DelayedCommitProtocol is requested for a new file (path).

Aborting Task

abortTask(
  taskContext: TaskAttemptContext): Unit

abortTask is part of the FileCommitProtocol (Apache Spark) abstraction.

abortTask is a noop.

Logging

Enable ALL logging level for org.apache.spark.sql.delta.files.DelayedCommitProtocol logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.files.DelayedCommitProtocol=ALL

Refer to Logging.


Last update: 2021-05-22
Back to top