ManifestFileCommitProtocol¶
ManifestFileCommitProtocol
is a FileCommitProtocol
for tracking valid files (per micro-batch) in FileStreamSinkLog.
The Internals of Apache Spark
Learn more on FileCommitProtocol in The Internals of Apache Spark.
Creating Instance¶
ManifestFileCommitProtocol
takes the following to be created:
- Job ID (unused)
- Path to write the output to
ManifestFileCommitProtocol
is created when FileStreamSink
is requested to add a batch (which is every micro-batch).
FileStreamSinkLog¶
ManifestFileCommitProtocol
is given a FileStreamSinkLog when setting up the manifest options for a micro-batch (right after having been created).
FileStreamSinkLog
is used to add the SinkFileStatuses (in a micro-batch) when ManifestFileCommitProtocol
is requested to commit a write job.
Setting Up Manifest Options¶
setupManifestOptions(
fileLog: FileStreamSinkLog,
batchId: Long): Unit
setupManifestOptions
assigns the FileStreamSinkLog and batchId.
setupManifestOptions
is used when FileStreamSink
is requested to add a batch (right after having been created).
Setting Up Job¶
setupJob(
jobContext: JobContext): Unit
setupJob
initializes pendingCommitFiles to be an empty collection of Hadoop Paths.
setupJob
is part of the FileCommitProtocol
(Spark SQL) abstraction.
Setting Up Task¶
setupTask(
taskContext: TaskAttemptContext): Unit
setupTask
initializes addedFiles to be an empty collection of file locations (?)
setupTask
is part of the FileCommitProtocol
(Spark SQL) abstraction.
newTaskTempFile¶
newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
ext: String): String
newTaskTempFile
creates a temporary file part-[split]-[uuid][ext]
in the optional dir
location or the path and adds it to addedFiles internal registry.
newTaskTempFile
is part of the FileCommitProtocol
(Spark SQL) abstraction.
Task Committed¶
onTaskCommit(
taskCommit: TaskCommitMessage): Unit
onTaskCommit
adds the SinkFileStatuss from the given taskCommits
to pendingCommitFiles internal registry.
onTaskCommit
is part of the FileCommitProtocol
(Spark SQL) abstraction.
Committing Task¶
commitTask(
taskContext: TaskAttemptContext): TaskCommitMessage
commitTask
creates a TaskCommitMessage
with SinkFileStatuses for every added file.
commitTask
is part of the FileCommitProtocol
(Spark SQL) abstraction.
Aborting Task¶
abortTask(
taskContext: TaskAttemptContext): Unit
abortTask
deletes added files.
abortTask
is part of the FileCommitProtocol
(Spark SQL) abstraction.
Committing Job¶
commitJob(
jobContext: JobContext,
taskCommits: Seq[TaskCommitMessage]): Unit
commitJob
takes SinkFileStatuss from the given taskCommits
.
In the end, commitJob
requests the FileStreamSinkLog to add the SinkFileStatus
s as the batchId. If successful (true
), commitJob
prints out the following INFO message to the logs:
Committed batch [batchId]
Otherwise, when failed (false
), commitJob
throws an IllegalStateException
:
Race while writing batch [batchId]
commitJob
is part of the FileCommitProtocol
(Spark SQL) abstraction.
Aborting Job¶
abortJob(
jobContext: JobContext): Unit
abortJob
simply tries to remove all pendingCommitFiles if there are any and clear it up.
abortJob
is part of the FileCommitProtocol
(Spark SQL) abstraction.