ManifestFileCommitProtocol¶
ManifestFileCommitProtocol is a FileCommitProtocol for tracking valid files (per micro-batch) in FileStreamSinkLog.
The Internals of Apache Spark
Learn more on FileCommitProtocol in The Internals of Apache Spark.
Creating Instance¶
ManifestFileCommitProtocol takes the following to be created:
- Job ID (unused)
- Path to write the output to
ManifestFileCommitProtocol is created when FileStreamSink is requested to add a batch (which is every micro-batch).
FileStreamSinkLog¶
ManifestFileCommitProtocol is given a FileStreamSinkLog when setting up the manifest options for a micro-batch (right after having been created).
FileStreamSinkLog is used to add the SinkFileStatuses (in a micro-batch) when ManifestFileCommitProtocol is requested to commit a write job.
Setting Up Manifest Options¶
setupManifestOptions(
fileLog: FileStreamSinkLog,
batchId: Long): Unit
setupManifestOptions assigns the FileStreamSinkLog and batchId.
setupManifestOptions is used when FileStreamSink is requested to add a batch (right after having been created).
Setting Up Job¶
setupJob(
jobContext: JobContext): Unit
setupJob initializes pendingCommitFiles to be an empty collection of Hadoop Paths.
setupJob is part of the FileCommitProtocol (Spark SQL) abstraction.
Setting Up Task¶
setupTask(
taskContext: TaskAttemptContext): Unit
setupTask initializes addedFiles to be an empty collection of file locations (?)
setupTask is part of the FileCommitProtocol (Spark SQL) abstraction.
newTaskTempFile¶
newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
ext: String): String
newTaskTempFile creates a temporary file part-[split]-[uuid][ext] in the optional dir location or the path and adds it to addedFiles internal registry.
newTaskTempFile is part of the FileCommitProtocol (Spark SQL) abstraction.
Task Committed¶
onTaskCommit(
taskCommit: TaskCommitMessage): Unit
onTaskCommit adds the SinkFileStatuss from the given taskCommits to pendingCommitFiles internal registry.
onTaskCommit is part of the FileCommitProtocol (Spark SQL) abstraction.
Committing Task¶
commitTask(
taskContext: TaskAttemptContext): TaskCommitMessage
commitTask creates a TaskCommitMessage with SinkFileStatuses for every added file.
commitTask is part of the FileCommitProtocol (Spark SQL) abstraction.
Aborting Task¶
abortTask(
taskContext: TaskAttemptContext): Unit
abortTask deletes added files.
abortTask is part of the FileCommitProtocol (Spark SQL) abstraction.
Committing Job¶
commitJob(
jobContext: JobContext,
taskCommits: Seq[TaskCommitMessage]): Unit
commitJob takes SinkFileStatuss from the given taskCommits.
In the end, commitJob requests the FileStreamSinkLog to add the SinkFileStatuss as the batchId. If successful (true), commitJob prints out the following INFO message to the logs:
Committed batch [batchId]
Otherwise, when failed (false), commitJob throws an IllegalStateException:
Race while writing batch [batchId]
commitJob is part of the FileCommitProtocol (Spark SQL) abstraction.
Aborting Job¶
abortJob(
jobContext: JobContext): Unit
abortJob simply tries to remove all pendingCommitFiles if there are any and clear it up.
abortJob is part of the FileCommitProtocol (Spark SQL) abstraction.