DelayedCommitProtocol¶
DelayedCommitProtocol
is a FileCommitProtocol
(Apache Spark) to write out data to a directory and return the files added.
DelayedCommitProtocol
is used to model a distributed write that is orchestrated by the Spark driver with the write itself happening on executors.
Note
FileCommitProtocol
allows to track a write job (with a write task per partition) and inform the driver when all the write tasks finished successfully (and were committed) to consider the write job completed.
TaskCommitMessage
(Spark Core) allows to "transfer" the file names added (written out) on the executors to the driver for the optimistic transactional writer.
Creating Instance¶
DelayedCommitProtocol
takes the following to be created:
DelayedCommitProtocol
is created when:
TransactionalWrite
is requested for a committer (to write data out to the directory)
Job ID¶
DelayedCommitProtocol
is given a job ID that is always delta
.
Unused
The Job ID does not seem to be used.
Data Path¶
DelayedCommitProtocol
is given a path
when created.
The path is the data directory of a delta table (this DelayedCommitProtocol
coordinates a write process to)
Length of Random Prefix¶
DelayedCommitProtocol
can be given a randomPrefixLength
when created.
The randomPrefixLength
is always undefined (None
).
Change Data Feed Partition Handling¶
DelayedCommitProtocol
defines 3 values to support Change Data Feed:
__is_cdc=false
__is_cdc=true
- A
Regex
to match on__is_cdc=true
text
DelayedCommitProtocol
uses them for newTaskTempFile (to create temporary files in _change_data directory instead based on the regular expression).
Added Files (on Executors)¶
addedFiles: ArrayBuffer[(Map[String, String], String)]
DelayedCommitProtocol
uses addedFiles
internal registry to track the partition values (if writing happened to a partitioned table) and the relative paths of the files that were added by a write task.
addedFiles
is used on executors only.
addedFiles
is initialized (as an empty collection) when setting up a task.
addedFiles
is used when:
DelayedCommitProtocol
is requested to commit a task (on an executor and create aTaskCommitMessage
with the files added while a task was writing data out)
Added Statuses (on Driver)¶
addedStatuses: ArrayBuffer[AddFile]
DelayedCommitProtocol
uses addedStatuses
internal registry to track the AddFile files added by write tasks (on executors) once all they finish successfully and the write job is committed (on a driver).
addedStatuses
is used on the driver only.
addedStatuses
is used when:
DelayedCommitProtocol
is requested to commit a job (on a driver)TransactionalWrite
is requested to write out a structured query
AddCDCFiles¶
changeFiles: ArrayBuffer[AddCDCFile]
DelayedCommitProtocol
uses changeFiles
internal registry to track the AddCDCFile files added by write tasks (on executors) once all they finish successfully and the write job is committed (on a driver).
changeFiles
is used on the driver only.
changeFiles
is used when:
DelayedCommitProtocol
is requested to commit a job (on a driver)TransactionalWrite
is requested to write out a structured query
Committing Job¶
Signature
commitJob(
jobContext: JobContext,
taskCommits: Seq[TaskCommitMessage]): Unit
commitJob
is part of the FileCommitProtocol
(Apache Spark) abstraction.
commitJob
partitions the given TaskCommitMessage
s into a collection of AddFiles and AddCDCFiles.
In the end, commitJob
adds the AddFile
s to addedStatuses registry while the AddCDCFile
s to the changeFiles.
Setting Up Task¶
FileCommitProtocol
setupTask(
taskContext: TaskAttemptContext): Unit
setupTask
is part of the FileCommitProtocol
(Apache Spark) abstraction.
setupTask
initializes the addedFiles internal registry to be empty.
New Task Temp File¶
FileCommitProtocol
newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
ext: String): String
newTaskTempFile
is part of the FileCommitProtocol
(Apache Spark) abstraction.
Note
The given dir
defines a partition directory if a query writes out to a partitioned table.
newTaskTempFile
parses the partition values in the given dir
, if available, or assumes no partition values (partitionValues
).
newTaskTempFile
creates a file name to write data out (for the given TaskAttemptContext
, ext
ension and the partition values).
Change Data Feed
While creating a file name, DelayedCommitProtocol
uses the __is_cdc virtual column name to split (divide) CDF data from the main table data. If the __is_cdc
column contains true
, the file name uses cdc-
prefix (not part-
).
newTaskTempFile
builds a relative directory path (using the randomPrefixLength or the optional dir
if either is defined).
randomPrefixLength always undefined
randomPrefixLength is always undefined (None
) so we can safely skip this branch.
-
For the directory to be exactly __is_cdc=false,
newTaskTempFile
returns the file name (with no further changes). -
For the directory with the __is_cdc=true path prefix,
newTaskTempFile
replaces the prefix with _change_data and uses the changed directory as the parent of the file name.val subDir = "__is_cdc=true/a/b/c" val cdcPartitionTrue = "__is_cdc=true" val cdcPartitionTrueRegex = cdcPartitionTrue.r val path = cdcPartitionTrueRegex.replaceFirstIn(subDir, "_change_data") assert(path == "_change_data/a/b/c")
Change Data Feed
This is when
DelayedCommitProtocol
"redirects" writing out the __is_cdc=true-partitioned CDF data files to _change_data directory. -
For the directory with the __is_cdc=false path prefix,
newTaskTempFile
removes the prefix and uses the changed directory as the parent of the file name. -
For other cases,
newTaskTempFile
uses the directory as the parent of the file name.
When neither the randomPrefixLength nor the partition directory (dir
) is defined, newTaskTempFile
uses the file name (with no further changes).
newTaskTempFile
adds the partition values and the relative path to the addedFiles internal registry.
In the end, newTaskTempFile
returns the absolute path of the (relative) path in the directory.
File Name¶
getFileName(
taskContext: TaskAttemptContext,
ext: String,
partitionValues: Map[String, String]): String
getFileName
returns a file name of the format:
[prefix]-[split]-[uuid][ext]
The file name is created as follows:
- The
prefix
part is one of the following:cdc
for the givenpartitionValues
with the __is_cdc partition column withtrue
valuepart
otherwise
- The
split
part is the task ID from the givenTaskAttemptContext
(Apache Hadoop) - The
uuid
part is a random UUID
Committing Task¶
FileCommitProtocol
commitTask(
taskContext: TaskAttemptContext): TaskCommitMessage
commitTask
is part of the FileCommitProtocol
(Apache Spark) abstraction.
commitTask
creates a TaskCommitMessage
with a FileAction (a AddCDCFile or a AddFile based on __is_cdc virtual partition column) for every file added (if there were any added successfully). Otherwise, commitTask
creates an empty TaskCommitMessage
.
Note
A file is added (to the addedFiles internal registry) when DelayedCommitProtocol
is requested for a new file (path).
buildActionFromAddedFile¶
buildActionFromAddedFile(
f: (Map[String, String], String),
stat: FileStatus,
taskContext: TaskAttemptContext): FileAction
Note
f
argument is a pair of the partition values and one of the file added.
buildActionFromAddedFile
removes the __is_cdc virtual partition column and creates a FileAction:
- AddCDCFiles for __is_cdc=true partition files
- AddFiles otherwise
Aborting Job¶
FileCommitProtocol
abortJob(
jobContext: JobContext): Unit
abortJob
is part of the FileCommitProtocol
(Apache Spark) abstraction.
abortJob
does nothing (is a noop).
Aborting Task¶
FileCommitProtocol
abortTask(
taskContext: TaskAttemptContext): Unit
abortTask
is part of the FileCommitProtocol
(Apache Spark) abstraction.
abortTask
does nothing (is a noop).
Setting Up Job¶
FileCommitProtocol
setupJob(
jobContext: JobContext): Unit
setupJob
is part of the FileCommitProtocol
(Apache Spark) abstraction.
setupJob
does nothing (is a noop).
New Temp File (Absolute Path)¶
FileCommitProtocol
newTaskTempFileAbsPath(
taskContext: TaskAttemptContext,
absoluteDir: String,
ext: String): String
newTaskTempFileAbsPath
is part of the FileCommitProtocol
(Apache Spark) abstraction.
newTaskTempFileAbsPath
throws an UnsupportedOperationException
:
[this] does not support adding files with an absolute path
Logging¶
Enable ALL
logging level for org.apache.spark.sql.delta.files.DelayedCommitProtocol
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.delta.files.DelayedCommitProtocol=ALL
Refer to Logging.