DelayedCommitProtocol¶
DelayedCommitProtocol
is used to model a distributed write that is orchestrated by the Spark driver with the write itself happening on executors.
DelayedCommitProtocol
is a concrete FileCommitProtocol
(Spark Core) to write out a result of a structured query to a <
FileCommitProtocol
(Apache Spark) allows to track a write job (with a write task per partition) and inform the driver when all the write tasks finished successfully (and were <TaskCommitMessage
(Spark Core) allows to "transfer" the files added (written out) on the executors to the driver for the <
DelayedCommitProtocol
is <TransactionalWrite
is requested for a <
[[logging]] [TIP] ==== Enable ALL
logging level for org.apache.spark.sql.delta.files.DelayedCommitProtocol
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.delta.files.DelayedCommitProtocol=ALL
Refer to Logging.¶
== [[creating-instance]] Creating DelayedCommitProtocol Instance
DelayedCommitProtocol
takes the following to be created:
- [[jobId]] Job ID (seems always <
>) - [[path]] Directory (to write files to)
- [[randomPrefixLength]] Optional length of a random prefix (seems always <
>)
DelayedCommitProtocol
initializes the <
== [[setupTask]] setupTask
Method
[source, scala]¶
setupTask( taskContext: TaskAttemptContext): Unit
NOTE: setupTask
is part of the FileCommitProtocol
contract to set up a task for a writing job.
setupTask
simply initializes the <
== [[newTaskTempFile]] newTaskTempFile
Method
[source, scala]¶
newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String
NOTE: newTaskTempFile
is part of the FileCommitProtocol
contract to inform the committer to add a new file.
newTaskTempFile
<TaskAttemptContext
and ext
.
newTaskTempFile
tries to <dir
or falls back to an empty partitionValues
.
NOTE: The given dir
defines a partition directory if the streaming query (and hence the write) is partitioned.
newTaskTempFile
builds a path (based on the given randomPrefixLength
and the dir
, or uses the file name directly).
NOTE: FIXME When would the optional dir
and the <
newTaskTempFile
adds the partition values and the relative path to the <
In the end, newTaskTempFile
returns the absolute path of the (relative) path in the <
== [[commitTask]] Committing Task (After Successful Write) -- commitTask
Method
[source, scala]¶
commitTask( taskContext: TaskAttemptContext): TaskCommitMessage
NOTE: commitTask
is part of the FileCommitProtocol
contract to commit a task after the writes succeed.
commitTask
simply creates a TaskCommitMessage
with an <TaskCommitMessage
is empty.
NOTE: A file is added (to <DelayedCommitProtocol
is requested for a <
== [[commitJob]] Committing Spark Job (After Successful Write) -- commitJob
Method
[source, scala]¶
commitJob( jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit
NOTE: commitJob
is part of the FileCommitProtocol
contract to commit a job after the writes succeed.
commitJob
simply adds the <taskCommits
from every <
== [[parsePartitions]] parsePartitions
Method
[source, scala]¶
parsePartitions( dir: String): Map[String, String]
parsePartitions
...FIXME
NOTE: parsePartitions
is used exclusively when DelayedCommitProtocol
is requested to <
== [[setupJob]] setupJob
Method
[source, scala]¶
setupJob( jobContext: JobContext): Unit
NOTE: setupJob
is part of the FileCommitProtocol
contract to set up a Spark job.
setupJob
does nothing.
== [[abortJob]] abortJob
Method
[source, scala]¶
abortJob( jobContext: JobContext): Unit
NOTE: abortJob
is part of the FileCommitProtocol
contract to abort a Spark job.
abortJob
does nothing.
== [[getFileName]] getFileName
Method
[source, scala]¶
getFileName( taskContext: TaskAttemptContext, ext: String): String
getFileName
takes the task ID from the given TaskAttemptContext
(for the split
part below).
getFileName
generates a random UUID (for the uuid
part below).
In the end, getFileName
returns a file name of the format:
part-[split]%05d-[uuid][ext]
NOTE: getFileName
is used exclusively when DelayedCommitProtocol
is requested to <
== [[addedFiles]] addedFiles
Internal Registry
[source, scala]¶
addedFiles: ArrayBuffer[(Map[String, String], String)]¶
addedFiles
tracks the files <
addedFiles
is initialized (as an empty collection) in <
NOTE: addedFiles
is used when DelayedCommitProtocol
is requested to <TaskCommitMessage
with the files added while a task was writing out a partition of a streaming query).
== [[addedStatuses]] addedStatuses
Internal Registry
[source, scala]¶
addedStatuses = new ArrayBuffer[AddFile]¶
addedStatuses
is the files that were added by <
[NOTE]¶
addedStatuses
is used when:
DelayedCommitProtocol
is requested to <> (on a driver)