Skip to content

ManifestFileCommitProtocol

ManifestFileCommitProtocol is a FileCommitProtocol for tracking valid files (per micro-batch) in FileStreamSinkLog.

The Internals of Apache Spark

Learn more on FileCommitProtocol in The Internals of Apache Spark.

Creating Instance

ManifestFileCommitProtocol takes the following to be created:

  • Job ID (unused)
  • Path to write the output to

ManifestFileCommitProtocol is created when FileStreamSink is requested to add a batch (which is every micro-batch).

FileStreamSinkLog

ManifestFileCommitProtocol is given a FileStreamSinkLog when setting up the manifest options for a micro-batch (right after having been created).

FileStreamSinkLog is used to add the SinkFileStatuses (in a micro-batch) when ManifestFileCommitProtocol is requested to commit a write job.

Setting Up Manifest Options

setupManifestOptions(
  fileLog: FileStreamSinkLog,
  batchId: Long): Unit

setupManifestOptions assigns the FileStreamSinkLog and batchId.

setupManifestOptions is used when FileStreamSink is requested to add a batch (right after having been created).

Setting Up Job

setupJob(
  jobContext: JobContext): Unit

setupJob initializes pendingCommitFiles to be an empty collection of Hadoop Paths.

setupJob is part of the FileCommitProtocol (Spark SQL) abstraction.

Setting Up Task

setupTask(
  taskContext: TaskAttemptContext): Unit

setupTask initializes addedFiles to be an empty collection of file locations (?)

setupTask is part of the FileCommitProtocol (Spark SQL) abstraction.

newTaskTempFile

newTaskTempFile(
  taskContext: TaskAttemptContext,
  dir: Option[String],
  ext: String): String

newTaskTempFile creates a temporary file part-[split]-[uuid][ext] in the optional dir location or the path and adds it to addedFiles internal registry.

newTaskTempFile is part of the FileCommitProtocol (Spark SQL) abstraction.

Task Committed

onTaskCommit(
  taskCommit: TaskCommitMessage): Unit

onTaskCommit adds the SinkFileStatuss from the given taskCommits to pendingCommitFiles internal registry.

onTaskCommit is part of the FileCommitProtocol (Spark SQL) abstraction.

Committing Task

commitTask(
  taskContext: TaskAttemptContext): TaskCommitMessage

commitTask creates a TaskCommitMessage with SinkFileStatuses for every added file.

commitTask is part of the FileCommitProtocol (Spark SQL) abstraction.

Aborting Task

abortTask(
  taskContext: TaskAttemptContext): Unit

abortTask deletes added files.

abortTask is part of the FileCommitProtocol (Spark SQL) abstraction.

Committing Job

commitJob(
  jobContext: JobContext,
  taskCommits: Seq[TaskCommitMessage]): Unit

commitJob takes SinkFileStatuss from the given taskCommits.

In the end, commitJob requests the FileStreamSinkLog to add the SinkFileStatuss as the batchId. If successful (true), commitJob prints out the following INFO message to the logs:

Committed batch [batchId]

Otherwise, when failed (false), commitJob throws an IllegalStateException:

Race while writing batch [batchId]

commitJob is part of the FileCommitProtocol (Spark SQL) abstraction.

Aborting Job

abortJob(
  jobContext: JobContext): Unit

abortJob simply tries to remove all pendingCommitFiles if there are any and clear it up.

abortJob is part of the FileCommitProtocol (Spark SQL) abstraction.