FileCommitProtocol

FileCommitProtocol is an abstraction of committers that can setup, commit or abort a Spark job or task (while writing out a key-value RDD and the partitions).

FileCommitProtocol is used for RDD.saveAsNewAPIHadoopDataset and RDD.saveAsHadoopDataset transformations (that use SparkHadoopWriter utility to write a key-value RDD out).

A concrete FileCommitProtocol is created using FileCommitProtocol.instantiate utility.

Table 1. FileCommitProtocol Contract
Method Description

abortJob

abortJob(
  jobContext: JobContext): Unit

Aborts a job

Used when:

  • SparkHadoopWriter utility is used to write a key-value RDD

  • (Spark SQL) FileFormatWriter utility is used to write a result of a structured query

abortTask

abortTask(
  taskContext: TaskAttemptContext): Unit

Intercepts that a Spark task is (about to be) aborted

Used when:

  • SparkHadoopWriter utility is used to write an RDD partition

  • (Spark SQL) FileFormatDataWriter is requested to abort (when FileFormatWriter utility is used to write a result of a structured query)

commitJob

commitJob(
  jobContext: JobContext,
  taskCommits: Seq[TaskCommitMessage]): Unit

Used when:

  • SparkHadoopWriter utility is used to write a key-value RDD

  • (Spark SQL) FileFormatWriter utility is used to write a result of a structured query

commitTask

commitTask(
  taskContext: TaskAttemptContext): TaskCommitMessage

Used when:

  • SparkHadoopWriter utility is used to write an RDD partition

  • (Spark SQL) FileFormatDataWriter is requested to commit (when FileFormatWriter utility is used to write a result of a structured query)

newTaskTempFile

newTaskTempFile(
  taskContext: TaskAttemptContext,
  dir: Option[String],
  ext: String): String

Used when:

  • (Spark SQL) SingleDirectoryDataWriter and DynamicPartitionDataWriter are requested to write (and in turn newOutputWriter)

newTaskTempFileAbsPath

newTaskTempFileAbsPath(
  taskContext: TaskAttemptContext,
  absoluteDir: String,
  ext: String): String

Used when:

  • (Spark SQL) DynamicPartitionDataWriter is requested to write

onTaskCommit

onTaskCommit(
  taskCommit: TaskCommitMessage): Unit = {}

Used when:

  • (Spark SQL) FileFormatWriter is requested to write

setupJob

setupJob(
  jobContext: JobContext): Unit

Used when:

setupTask

setupTask(
  taskContext: TaskAttemptContext): Unit

Sets up the task with the Hadoop TaskAttemptContext

Used when:

Table 2. FileCommitProtocols
FileCommitProtocol Description

HadoopMapReduceCommitProtocol

HadoopMapRedCommitProtocol

SQLHadoopMapReduceCommitProtocol

Used for batch queries (Spark SQL)

ManifestFileCommitProtocol

Used for streaming queries (Spark Structured Streaming)

Instantiating FileCommitProtocol Committer — instantiate Utility

instantiate(
  className: String,
  jobId: String,
  outputPath: String,
  dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol

instantiate prints out the following DEBUG message to the logs:

Creating committer [className]; job [jobId]; output=[outputPath]; dynamic=[dynamicPartitionOverwrite]

instantiate tries to find a constructor method that takes three arguments (two of type String and one Boolean) for the given jobId, outputPath and dynamicPartitionOverwrite flag. If found, instantiate prints out the following DEBUG message to the logs:

Using (String, String, Boolean) constructor

In case of NoSuchMethodException, instantiate prints out the following DEBUG message to the logs:

Falling back to (String, String) constructor

instantiate tries to find a constructor method that takes two arguments (two of type String) for the given jobId and outputPath.

With two String arguments, instantiate requires that the given dynamicPartitionOverwrite flag is disabled (false) or throws an IllegalArgumentException:

requirement failed: Dynamic Partition Overwrite is enabled but the committer [className] does not have the appropriate constructor

instantiate is used when:

deleteWithJob Method

deleteWithJob(
  fs: FileSystem,
  path: Path,
  recursive: Boolean): Boolean

deleteWithJob simply requests the Hadoop FileSystem to delete a directory.

deleteWithJob is used when InsertIntoHadoopFsRelationCommand logical command (Spark SQL) is executed.