DelayedCommitProtocol

DelayedCommitProtocol is used to model a distributed write that is orchestrated by the Spark driver with the write itself happening on executors.

DelayedCommitProtocol is a concrete FileCommitProtocol (Spark Core) to write out a result of a structured query to a directory and return a list of files added.

FileCommitProtocol (Spark Core) allows to track a write job (with a write task per partition) and inform the driver when all the write tasks finished successfully (and were committed) to consider the write job completed. TaskCommitMessage (Spark Core) allows to "transfer" the files added (written out) on the executors to the driver for the optimistic transactional writer.

DelayedCommitProtocol is created exclusively when TransactionalWrite is requested for a committer to write a structured query to the directory.

Enable ALL logging level for org.apache.spark.sql.delta.files.DelayedCommitProtocol logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.files.DelayedCommitProtocol=ALL

Refer to Logging.

Creating DelayedCommitProtocol Instance

DelayedCommitProtocol takes the following to be created:

  • Job ID (seems always delta)

  • Directory (to write files to)

  • Optional length of a random prefix (seems always empty)

DelayedCommitProtocol initializes the internal properties.

setupTask Method

setupTask(
  taskContext: TaskAttemptContext): Unit
setupTask is part of the FileCommitProtocol contract to set up a task for a writing job.

setupTask simply initializes the addedFiles internal registry to be empty.

newTaskTempFile Method

newTaskTempFile(
  taskContext: TaskAttemptContext,
  dir: Option[String],
  ext: String): String
newTaskTempFile is part of the FileCommitProtocol contract to inform the committer to add a new file.

newTaskTempFile creates a file name for the given TaskAttemptContext and ext.

newTaskTempFile tries to parsePartitions with the given dir or falls back to an empty partitionValues.

The given dir defines a partition directory if the streaming query (and hence the write) is partitioned.

newTaskTempFile builds a path (based on the given randomPrefixLength and the dir, or uses the file name directly).

FIXME When would the optional dir and the randomPrefixLength be defined?

newTaskTempFile adds the partition values and the relative path to the addedFiles internal registry.

In the end, newTaskTempFile returns the absolute path of the (relative) path in the directory.

Committing Task (After Successful Write) — commitTask Method

commitTask(
  taskContext: TaskAttemptContext): TaskCommitMessage
commitTask is part of the FileCommitProtocol contract to commit a task after the writes succeed.

commitTask simply creates a TaskCommitMessage with an AddFile for every file added if there were any. Otherwise, the TaskCommitMessage is empty.

A file is added (to addedFiles internal registry) when DelayedCommitProtocol is requested for a new file (path).

Committing Spark Job (After Successful Write) — commitJob Method

commitJob(
  jobContext: JobContext,
  taskCommits: Seq[TaskCommitMessage]): Unit
commitJob is part of the FileCommitProtocol contract to commit a job after the writes succeed.

commitJob simply adds the AddFiles (from the given taskCommits from every commitTask) to the addedStatuses internal registry.

parsePartitions Method

parsePartitions(
  dir: String): Map[String, String]

parsePartitions…​FIXME

parsePartitions is used exclusively when DelayedCommitProtocol is requested to newTaskTempFile.

setupJob Method

setupJob(
  jobContext: JobContext): Unit
setupJob is part of the FileCommitProtocol contract to set up a Spark job.

setupJob does nothing.

abortJob Method

abortJob(
  jobContext: JobContext): Unit
abortJob is part of the FileCommitProtocol contract to abort a Spark job.

abortJob does nothing.

getFileName Method

getFileName(
  taskContext: TaskAttemptContext,
  ext: String): String

getFileName takes the task ID from the given TaskAttemptContext (for the split part below).

getFileName generates a random UUID (for the uuid part below).

In the end, getFileName returns a file name of the format:

part-[split]%05d-[uuid][ext]
getFileName is used exclusively when DelayedCommitProtocol is requested to newTaskTempFile.

addedFiles Internal Registry

addedFiles: ArrayBuffer[(Map[String, String], String)]

addedFiles tracks the files added by a Spark write task (that runs on an executor).

addedFiles is initialized (as an empty collection) in setupTask.

addedFiles is used when DelayedCommitProtocol is requested to commit a task (on an executor and create a TaskCommitMessage with the files added while a task was writing out a partition of a streaming query).

addedStatuses Internal Registry

addedStatuses = new ArrayBuffer[AddFile]

addedStatuses is the files that were added by write tasks (on executors) once all they finish successfully and the write job is committed (on a driver).

addedStatuses is used when: