DelayedCommitProtocol¶
DelayedCommitProtocol is a FileCommitProtocol (Apache Spark) to write out data to a directory and return the files added.
DelayedCommitProtocol is used to model a distributed write that is orchestrated by the Spark driver with the write itself happening on executors.
Note
FileCommitProtocol allows to track a write job (with a write task per partition) and inform the driver when all the write tasks finished successfully (and were committed) to consider the write job completed.
TaskCommitMessage (Spark Core) allows to "transfer" the file names added (written out) on the executors to the driver for the optimistic transactional writer.
Creating Instance¶
DelayedCommitProtocol takes the following to be created:
DelayedCommitProtocol is created when:
TransactionalWriteis requested for a committer (to write data out to the directory)
Job ID¶
DelayedCommitProtocol is given a job ID that is always delta.
Unused
The Job ID does not seem to be used.
Data Path¶
DelayedCommitProtocol is given a path when created.
The path is the data directory of a delta table (this DelayedCommitProtocol coordinates a write process to)
Length of Random Prefix¶
DelayedCommitProtocol can be given a randomPrefixLength when created.
The randomPrefixLength is always undefined (None).
Change Data Feed Partition Handling¶
DelayedCommitProtocol defines 3 values to support Change Data Feed:
__is_cdc=false__is_cdc=true- A
Regexto match on__is_cdc=truetext
DelayedCommitProtocol uses them for newTaskTempFile (to create temporary files in _change_data directory instead based on the regular expression).
Added Files (on Executors)¶
addedFiles: ArrayBuffer[(Map[String, String], String)]
DelayedCommitProtocol uses addedFiles internal registry to track the partition values (if writing happened to a partitioned table) and the relative paths of the files that were added by a write task.
addedFiles is used on executors only.
addedFiles is initialized (as an empty collection) when setting up a task.
addedFiles is used when:
DelayedCommitProtocolis requested to commit a task (on an executor and create aTaskCommitMessagewith the files added while a task was writing data out)
Added Statuses (on Driver)¶
addedStatuses: ArrayBuffer[AddFile]
DelayedCommitProtocol uses addedStatuses internal registry to track the AddFile files added by write tasks (on executors) once all they finish successfully and the write job is committed (on a driver).
addedStatuses is used on the driver only.
addedStatuses is used when:
DelayedCommitProtocolis requested to commit a job (on a driver)TransactionalWriteis requested to write out a structured query
AddCDCFiles¶
changeFiles: ArrayBuffer[AddCDCFile]
DelayedCommitProtocol uses changeFiles internal registry to track the AddCDCFile files added by write tasks (on executors) once all they finish successfully and the write job is committed (on a driver).
changeFiles is used on the driver only.
changeFiles is used when:
DelayedCommitProtocolis requested to commit a job (on a driver)TransactionalWriteis requested to write out a structured query
Committing Job¶
Signature
commitJob(
jobContext: JobContext,
taskCommits: Seq[TaskCommitMessage]): Unit
commitJob is part of the FileCommitProtocol (Apache Spark) abstraction.
commitJob partitions the given TaskCommitMessages into a collection of AddFiles and AddCDCFiles.
In the end, commitJob adds the AddFiles to addedStatuses registry while the AddCDCFiles to the changeFiles.
Setting Up Task¶
FileCommitProtocol
setupTask(
taskContext: TaskAttemptContext): Unit
setupTask is part of the FileCommitProtocol (Apache Spark) abstraction.
setupTask initializes the addedFiles internal registry to be empty.
New Task Temp File¶
FileCommitProtocol
newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
ext: String): String
newTaskTempFile is part of the FileCommitProtocol (Apache Spark) abstraction.
Note
The given dir defines a partition directory if a query writes out to a partitioned table.
newTaskTempFile parses the partition values in the given dir, if available, or assumes no partition values (partitionValues).
newTaskTempFile creates a file name to write data out (for the given TaskAttemptContext, extension and the partition values).
Change Data Feed
While creating a file name, DelayedCommitProtocol uses the __is_cdc virtual column name to split (divide) CDF data from the main table data. If the __is_cdc column contains true, the file name uses cdc- prefix (not part-).
newTaskTempFile builds a relative directory path (using the randomPrefixLength or the optional dir if either is defined).
randomPrefixLength always undefined
randomPrefixLength is always undefined (None) so we can safely skip this branch.
-
For the directory to be exactly __is_cdc=false,
newTaskTempFilereturns the file name (with no further changes). -
For the directory with the __is_cdc=true path prefix,
newTaskTempFilereplaces the prefix with _change_data and uses the changed directory as the parent of the file name.val subDir = "__is_cdc=true/a/b/c" val cdcPartitionTrue = "__is_cdc=true" val cdcPartitionTrueRegex = cdcPartitionTrue.r val path = cdcPartitionTrueRegex.replaceFirstIn(subDir, "_change_data") assert(path == "_change_data/a/b/c")Change Data Feed
This is when
DelayedCommitProtocol"redirects" writing out the __is_cdc=true-partitioned CDF data files to _change_data directory. -
For the directory with the __is_cdc=false path prefix,
newTaskTempFileremoves the prefix and uses the changed directory as the parent of the file name. -
For other cases,
newTaskTempFileuses the directory as the parent of the file name.
When neither the randomPrefixLength nor the partition directory (dir) is defined, newTaskTempFile uses the file name (with no further changes).
newTaskTempFile adds the partition values and the relative path to the addedFiles internal registry.
In the end, newTaskTempFile returns the absolute path of the (relative) path in the directory.
File Name¶
getFileName(
taskContext: TaskAttemptContext,
ext: String,
partitionValues: Map[String, String]): String
getFileName returns a file name of the format:
[prefix]-[split]-[uuid][ext]
The file name is created as follows:
- The
prefixpart is one of the following:cdcfor the givenpartitionValueswith the __is_cdc partition column withtruevaluepartotherwise
- The
splitpart is the task ID from the givenTaskAttemptContext(Apache Hadoop) - The
uuidpart is a random UUID
Committing Task¶
FileCommitProtocol
commitTask(
taskContext: TaskAttemptContext): TaskCommitMessage
commitTask is part of the FileCommitProtocol (Apache Spark) abstraction.
commitTask creates a TaskCommitMessage with a FileAction (a AddCDCFile or a AddFile based on __is_cdc virtual partition column) for every file added (if there were any added successfully). Otherwise, commitTask creates an empty TaskCommitMessage.
Note
A file is added (to the addedFiles internal registry) when DelayedCommitProtocol is requested for a new file (path).
buildActionFromAddedFile¶
buildActionFromAddedFile(
f: (Map[String, String], String),
stat: FileStatus,
taskContext: TaskAttemptContext): FileAction
Note
f argument is a pair of the partition values and one of the file added.
buildActionFromAddedFile removes the __is_cdc virtual partition column and creates a FileAction:
- AddCDCFiles for __is_cdc=true partition files
- AddFiles otherwise
Aborting Job¶
FileCommitProtocol
abortJob(
jobContext: JobContext): Unit
abortJob is part of the FileCommitProtocol (Apache Spark) abstraction.
abortJob does nothing (is a noop).
Aborting Task¶
FileCommitProtocol
abortTask(
taskContext: TaskAttemptContext): Unit
abortTask is part of the FileCommitProtocol (Apache Spark) abstraction.
abortTask does nothing (is a noop).
Setting Up Job¶
FileCommitProtocol
setupJob(
jobContext: JobContext): Unit
setupJob is part of the FileCommitProtocol (Apache Spark) abstraction.
setupJob does nothing (is a noop).
New Temp File (Absolute Path)¶
FileCommitProtocol
newTaskTempFileAbsPath(
taskContext: TaskAttemptContext,
absoluteDir: String,
ext: String): String
newTaskTempFileAbsPath is part of the FileCommitProtocol (Apache Spark) abstraction.
newTaskTempFileAbsPath throws an UnsupportedOperationException:
[this] does not support adding files with an absolute path
Logging¶
Enable ALL logging level for org.apache.spark.sql.delta.files.DelayedCommitProtocol logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.delta.files.DelayedCommitProtocol=ALL
Refer to Logging.