FileCommitProtocol¶
FileCommitProtocol is an abstraction of file committers that can setup, commit or abort a Spark job or task (while writing out a pair RDD and partitions).
FileCommitProtocol is used for RDD.saveAsNewAPIHadoopDataset and RDD.saveAsHadoopDataset transformations (that use SparkHadoopWriter utility to write a key-value RDD out).
FileCommitProtocol is created using FileCommitProtocol.instantiate utility.
Contract¶
Aborting Job¶
abortJob(
jobContext: JobContext): Unit
Aborts a job
Used when:
SparkHadoopWriterutility is used to write a key-value RDD (and writing fails)- (Spark SQL)
FileFormatWriterutility is used to write a result of a structured query (and writing fails) - (Spark SQL)
FileBatchWriteis requested toabort
Aborting Task¶
abortTask(
taskContext: TaskAttemptContext): Unit
Abort a task
Used when:
SparkHadoopWriterutility is used to write an RDD partition- (Spark SQL)
FileFormatDataWriteris requested toabort
Committing Job¶
commitJob(
jobContext: JobContext,
taskCommits: Seq[TaskCommitMessage]): Unit
Commits a job after the writes succeed
Used when:
SparkHadoopWriterutility is used to write a key-value RDD- (Spark SQL)
FileFormatWriterutility is used to write a result of a structured query - (Spark SQL)
FileBatchWriteis requested tocommit
Committing Task¶
commitTask(
taskContext: TaskAttemptContext): TaskCommitMessage
Used when:
SparkHadoopWriterutility is used to write an RDD partition- (Spark SQL)
FileFormatDataWriteris requested tocommit
Deleting Path with Job¶
deleteWithJob(
fs: FileSystem,
path: Path,
recursive: Boolean): Boolean
deleteWithJob requests the given Hadoop FileSystem to delete a path directory.
Used when InsertIntoHadoopFsRelationCommand logical command (Spark SQL) is executed
New Task Temp File¶
newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
spec: FileNameSpec): String
newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
ext: String): String // @deprecated
Builds a path of a temporary file (for a task to write data to)
See:
- HadoopMapReduceCommitProtocol
DelayedCommitProtocol(Delta Lake)
Used when:
- (Spark SQL)
SingleDirectoryDataWriteris requested towritea record out - (Spark SQL)
BaseDynamicPartitionDataWriteris requested torenewCurrentWriter
newTaskTempFileAbsPath¶
newTaskTempFileAbsPath(
taskContext: TaskAttemptContext,
absoluteDir: String,
ext: String): String
Used when:
- (Spark SQL)
DynamicPartitionDataWriteris requested towrite
On Task Committed¶
onTaskCommit(
taskCommit: TaskCommitMessage): Unit
Used when:
- (Spark SQL)
FileFormatWriteris requested towrite
Setting Up Job¶
setupJob(
jobContext: JobContext): Unit
Used when:
SparkHadoopWriterutility is used to write an RDD partition (while writing out a key-value RDD)- (Spark SQL)
FileFormatWriterutility is used to write a result of a structured query - (Spark SQL)
FileWriteBuilderis requested tobuildForBatch
Setting Up Task¶
setupTask(
taskContext: TaskAttemptContext): Unit
Sets up the task with the Hadoop TaskAttemptContext
Used when:
SparkHadoopWriteris requested to write an RDD partition (while writing out a key-value RDD)- (Spark SQL)
FileFormatWriterutility is used to write out a RDD partition (while writing out a result of a structured query) - (Spark SQL)
FileWriterFactoryis requested tocreateWriter
Implementations¶
- HadoopMapReduceCommitProtocol
ManifestFileCommitProtocol(qv. Spark Structured Streaming)
Instantiating FileCommitProtocol Committer¶
instantiate(
className: String,
jobId: String,
outputPath: String,
dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol
instantiate prints out the following DEBUG message to the logs:
Creating committer [className]; job [jobId]; output=[outputPath]; dynamic=[dynamicPartitionOverwrite]
instantiate tries to find a constructor method that takes three arguments (two of type String and one Boolean) for the given jobId, outputPath and dynamicPartitionOverwrite flag. If found, instantiate prints out the following DEBUG message to the logs:
Using (String, String, Boolean) constructor
In case of NoSuchMethodException, instantiate prints out the following DEBUG message to the logs:
Falling back to (String, String) constructor
instantiate tries to find a constructor method that takes two arguments (two of type String) for the given jobId and outputPath.
With two String arguments, instantiate requires that the given dynamicPartitionOverwrite flag is disabled (false) or throws an IllegalArgumentException:
requirement failed: Dynamic Partition Overwrite is enabled but the committer [className] does not have the appropriate constructor
instantiate is used when:
- HadoopMapRedWriteConfigUtil and HadoopMapReduceWriteConfigUtil are requested to create a HadoopMapReduceCommitProtocol committer
- (Spark SQL)
InsertIntoHadoopFsRelationCommand,InsertIntoHiveDirCommand, andInsertIntoHiveTablelogical commands are executed - (Spark Structured Streaming)
FileStreamSinkis requested to write out a micro-batch data
Logging¶
Enable ALL logging level for org.apache.spark.internal.io.FileCommitProtocol logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.internal.io.FileCommitProtocol=ALL
Refer to Logging.