Skip to content

FileCommitProtocol

FileCommitProtocol is an abstraction of file committers that can setup, commit or abort a Spark job or task (while writing out a pair RDD and partitions).

FileCommitProtocol is used for RDD.saveAsNewAPIHadoopDataset and RDD.saveAsHadoopDataset transformations (that use SparkHadoopWriter utility to write a key-value RDD out).

FileCommitProtocol is created using FileCommitProtocol.instantiate utility.

Contract

Aborting Job

abortJob(
  jobContext: JobContext): Unit

Aborts a job

Used when:

  • SparkHadoopWriter utility is used to write a key-value RDD (and writing fails)
  • (Spark SQL) FileFormatWriter utility is used to write a result of a structured query (and writing fails)
  • (Spark SQL) FileBatchWrite is requested to abort

Aborting Task

abortTask(
  taskContext: TaskAttemptContext): Unit

Abort a task

Used when:

  • SparkHadoopWriter utility is used to write an RDD partition
  • (Spark SQL) FileFormatDataWriter is requested to abort

Committing Job

commitJob(
  jobContext: JobContext,
  taskCommits: Seq[TaskCommitMessage]): Unit

Commits a job after the writes succeed

Used when:

  • SparkHadoopWriter utility is used to write a key-value RDD
  • (Spark SQL) FileFormatWriter utility is used to write a result of a structured query
  • (Spark SQL) FileBatchWrite is requested to commit

Committing Task

commitTask(
  taskContext: TaskAttemptContext): TaskCommitMessage

Used when:

  • SparkHadoopWriter utility is used to write an RDD partition
  • (Spark SQL) FileFormatDataWriter is requested to commit

Deleting Path with Job

deleteWithJob(
  fs: FileSystem,
  path: Path,
  recursive: Boolean): Boolean

deleteWithJob requests the given Hadoop FileSystem to delete a path directory.

Used when InsertIntoHadoopFsRelationCommand logical command (Spark SQL) is executed

New Task Temp File

newTaskTempFile(
  taskContext: TaskAttemptContext,
  dir: Option[String],
  spec: FileNameSpec): String
newTaskTempFile(
  taskContext: TaskAttemptContext,
  dir: Option[String],
  ext: String): String // @deprecated

Builds a path of a temporary file (for a task to write data to)

See:

Used when:

  • (Spark SQL) SingleDirectoryDataWriter is requested to write a record out
  • (Spark SQL) BaseDynamicPartitionDataWriter is requested to renewCurrentWriter

newTaskTempFileAbsPath

newTaskTempFileAbsPath(
  taskContext: TaskAttemptContext,
  absoluteDir: String,
  ext: String): String

Used when:

  • (Spark SQL) DynamicPartitionDataWriter is requested to write

On Task Committed

onTaskCommit(
  taskCommit: TaskCommitMessage): Unit

Used when:

  • (Spark SQL) FileFormatWriter is requested to write

Setting Up Job

setupJob(
  jobContext: JobContext): Unit

Used when:

Setting Up Task

setupTask(
  taskContext: TaskAttemptContext): Unit

Sets up the task with the Hadoop TaskAttemptContext

Used when:

  • SparkHadoopWriter is requested to write an RDD partition (while writing out a key-value RDD)
  • (Spark SQL) FileFormatWriter utility is used to write out a RDD partition (while writing out a result of a structured query)
  • (Spark SQL) FileWriterFactory is requested to createWriter

Implementations

Instantiating FileCommitProtocol Committer

instantiate(
  className: String,
  jobId: String,
  outputPath: String,
  dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol

instantiate prints out the following DEBUG message to the logs:

Creating committer [className]; job [jobId]; output=[outputPath]; dynamic=[dynamicPartitionOverwrite]

instantiate tries to find a constructor method that takes three arguments (two of type String and one Boolean) for the given jobId, outputPath and dynamicPartitionOverwrite flag. If found, instantiate prints out the following DEBUG message to the logs:

Using (String, String, Boolean) constructor

In case of NoSuchMethodException, instantiate prints out the following DEBUG message to the logs:

Falling back to (String, String) constructor

instantiate tries to find a constructor method that takes two arguments (two of type String) for the given jobId and outputPath.

With two String arguments, instantiate requires that the given dynamicPartitionOverwrite flag is disabled (false) or throws an IllegalArgumentException:

requirement failed: Dynamic Partition Overwrite is enabled but the committer [className] does not have the appropriate constructor

instantiate is used when:

Logging

Enable ALL logging level for org.apache.spark.internal.io.FileCommitProtocol logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.internal.io.FileCommitProtocol=ALL

Refer to Logging.