Skip to content

DelayedCommitProtocol

DelayedCommitProtocol is used to model a distributed write that is orchestrated by the Spark driver with the write itself happening on executors.

DelayedCommitProtocol is a concrete FileCommitProtocol (Spark Core) to write out a result of a structured query to a <> and return a <>.

FileCommitProtocol (Spark Core) allows to track a write job (with a write task per partition) and inform the driver when all the write tasks finished successfully (and were <>) to consider the write job <>. TaskCommitMessage (Spark Core) allows to "transfer" the files added (written out) on the executors to the driver for the <>.

TIP: Read up on https://books.japila.pl/apache-spark-internals/apache-spark-internals/2.4.4/spark-internal-io-FileCommitProtocol.html[FileCommitProtocol] in https://books.japila.pl/apache-spark-internals[The Internals Of Apache Spark] online book.

DelayedCommitProtocol is <> exclusively when TransactionalWrite is requested for a <> to <> to the <>.

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.delta.files.DelayedCommitProtocol logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.files.DelayedCommitProtocol=ALL

Refer to Logging.

== [[creating-instance]] Creating DelayedCommitProtocol Instance

DelayedCommitProtocol takes the following to be created:

  • [[jobId]] Job ID (seems always <>)
  • [[path]] Directory (to write files to)
  • [[randomPrefixLength]] Optional length of a random prefix (seems always <>)

DelayedCommitProtocol initializes the <>.

== [[setupTask]] setupTask Method

[source, scala]

setupTask( taskContext: TaskAttemptContext): Unit


NOTE: setupTask is part of the FileCommitProtocol contract to set up a task for a writing job.

setupTask simply initializes the <> internal registry to be empty.

== [[newTaskTempFile]] newTaskTempFile Method

[source, scala]

newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String


NOTE: newTaskTempFile is part of the FileCommitProtocol contract to inform the committer to add a new file.

newTaskTempFile <> for the given TaskAttemptContext and ext.

newTaskTempFile tries to <> with the given dir or falls back to an empty partitionValues.

NOTE: The given dir defines a partition directory if the streaming query (and hence the write) is partitioned.

newTaskTempFile builds a path (based on the given randomPrefixLength and the dir, or uses the file name directly).

NOTE: FIXME When would the optional dir and the <> be defined?

newTaskTempFile adds the partition values and the relative path to the <> internal registry.

In the end, newTaskTempFile returns the absolute path of the (relative) path in the <>.

== [[commitTask]] Committing Task (After Successful Write) -- commitTask Method

[source, scala]

commitTask( taskContext: TaskAttemptContext): TaskCommitMessage


NOTE: commitTask is part of the FileCommitProtocol contract to commit a task after the writes succeed.

commitTask simply creates a TaskCommitMessage with an <> for every <> if there were any. Otherwise, the TaskCommitMessage is empty.

NOTE: A file is added (to <> internal registry) when DelayedCommitProtocol is requested for a <>.

== [[commitJob]] Committing Spark Job (After Successful Write) -- commitJob Method

[source, scala]

commitJob( jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit


NOTE: commitJob is part of the FileCommitProtocol contract to commit a job after the writes succeed.

commitJob simply adds the <> (from the given taskCommits from every <>) to the <> internal registry.

== [[parsePartitions]] parsePartitions Method

[source, scala]

parsePartitions( dir: String): Map[String, String]


parsePartitions...FIXME

NOTE: parsePartitions is used exclusively when DelayedCommitProtocol is requested to <>.

== [[setupJob]] setupJob Method

[source, scala]

setupJob( jobContext: JobContext): Unit


NOTE: setupJob is part of the FileCommitProtocol contract to set up a Spark job.

setupJob does nothing.

== [[abortJob]] abortJob Method

[source, scala]

abortJob( jobContext: JobContext): Unit


NOTE: abortJob is part of the FileCommitProtocol contract to abort a Spark job.

abortJob does nothing.

== [[getFileName]] getFileName Method

[source, scala]

getFileName( taskContext: TaskAttemptContext, ext: String): String


getFileName takes the task ID from the given TaskAttemptContext (for the split part below).

getFileName generates a random UUID (for the uuid part below).

In the end, getFileName returns a file name of the format:

part-[split]%05d-[uuid][ext]

NOTE: getFileName is used exclusively when DelayedCommitProtocol is requested to <>.

== [[addedFiles]] addedFiles Internal Registry

[source, scala]

addedFiles: ArrayBuffer[(Map[String, String], String)]

addedFiles tracks the files <> (that runs on an executor).

addedFiles is initialized (as an empty collection) in <>.

NOTE: addedFiles is used when DelayedCommitProtocol is requested to <> (on an executor and create a TaskCommitMessage with the files added while a task was writing out a partition of a streaming query).

== [[addedStatuses]] addedStatuses Internal Registry

[source, scala]

addedStatuses = new ArrayBuffer[AddFile]

addedStatuses is the files that were added by <> (on executors) once all they finish successfully and the <> (on a driver).

[NOTE]

addedStatuses is used when:

  • DelayedCommitProtocol is requested to <> (on a driver)

* TransactionalWrite is requested to <>


Last update: 2020-09-27