Skip to content

FileFormatWriter

FileFormatWriter utility is used to write out query result for the following:

Writing Out Query Result

write(
  sparkSession: SparkSession,
  plan: SparkPlan,
  fileFormat: FileFormat,
  committer: FileCommitProtocol,
  outputSpec: OutputSpec,
  hadoopConf: Configuration,
  partitionColumns: Seq[Attribute],
  bucketSpec: Option[BucketSpec],
  statsTrackers: Seq[WriteJobStatsTracker],
  options: Map[String, String],
  numStaticPartitionCols: Int = 0): Set[String]

write creates a Hadoop Job instance (with the given Hadoop Configuration) and uses the following job output classes:

Job Output Property Class
Key Void
Value InternalRow

write sets the output directory (mapreduce.output.fileoutputformat.outputdir property of the map-reduce job) to be the outputPath of the given OutputSpec.

write requests the given FileFormat to prepareWrite.

write creates a WriteJobDescription with the following options (if available):

Option Fallback Configuration Property
maxRecordsPerFile spark.sql.files.maxRecordsPerFile
timeZone spark.sql.session.timeZone

write sets spark.sql.sources.writeJobUUID configuration in the map-reduce Job instance.

In the end, write executeWrite.

executeWrite

executeWrite(
  session: SparkSession,
  planForWrites: SparkPlan,
  writeFilesSpec: WriteFilesSpec,
  job: Job): Set[String]
executeWrite(
  sparkSession: SparkSession,
  plan: SparkPlan,
  job: Job,
  description: WriteJobDescription,
  committer: FileCommitProtocol,
  outputSpec: OutputSpec,
  requiredOrdering: Seq[Expression],
  partitionColumns: Seq[Attribute],
  sortColumns: Seq[Attribute],
  orderingMatched: Boolean): Set[String]

executeWrite writeAndCommit (with the given Hadoop Job, WriteJobDescription, and FileCommitProtocol) and a function that does the following:

  1. Prepares an RDD[WriterCommitMessage] (by executing the given WriteFilesSpec or the SparkPlan)
  2. Runs a Spark job for the RDD[WriterCommitMessage] that "collects" WriteTaskResults (from executing write tasks)

writeAndCommit

writeAndCommit(
  job: Job,
  description: WriteJobDescription,
  committer: FileCommitProtocol)(
  f: => Array[WriteTaskResult]): Set[String]

writeAndCommit...FIXME

writeAndCommit prints out the following INFO message to the logs:

Start to commit write Job [uuid].

writeAndCommit requests the given FileCommitProtocol to commitJob.

writeAndCommit prints out the following INFO message to the logs:

Write Job [uuid] committed. Elapsed time: [duration] ms.

writeAndCommit processStats.

writeAndCommit prints out the following INFO message to the logs:

Finished processing stats for write job [uuid].

writeAndCommit returns the updated partitions.


In case of any Throwable, writeAndCommit prints out the following ERROR message to the logs:

Aborting job [uuid].

writeAndCommit requests the given FileCommitProtocol to abortJob.

Usage

write is used when:

Review Me

write requests the given FileCommitProtocol committer to setupJob.

write executes the given SparkPlan (and generates an RDD). The execution can be directly on the given physical operator if ordering matches the requirements or uses SortExec physical operator (with global flag off).

write runs a Spark job (action) on the RDD with executeTask as the partition function. The result task handler simply requests the given FileCommitProtocol committer to onTaskCommit (with the TaskCommitMessage of a WriteTaskResult) and saves the WriteTaskResult.

write requests the given FileCommitProtocol committer to commitJob (with the Hadoop Job instance and the TaskCommitMessage of all write tasks).

write prints out the following INFO message to the logs:

Write Job [uuid] committed.

write processStats.

write prints out the following INFO message to the logs:

Finished processing stats for write job [uuid].

In the end, write returns all the partition paths that were updated during this write job.

write And Throwables

In case of any Throwable, write prints out the following ERROR message to the logs:

Aborting job [uuid].

write requests the given FileCommitProtocol committer to abortJob (with the Hadoop Job instance).

In the end, write throws a SparkException.

Writing Data Out In Single Spark Task

executeTask(
  description: WriteJobDescription,
  jobIdInstant: Long,
  sparkStageId: Int,
  sparkPartitionId: Int,
  sparkAttemptNumber: Int,
  committer: FileCommitProtocol,
  iterator: Iterator[InternalRow]): WriteTaskResult

executeTask...FIXME

Processing Write Job Statistics

processStats(
  statsTrackers: Seq[WriteJobStatsTracker],
  statsPerTask: Seq[Seq[WriteTaskStats]],
  jobCommitDuration: Long): Unit

processStats requests every WriteJobStatsTracker to processStats (for respective WriteTaskStats in the given statsPerTask).

Logging

Enable ALL logging level for org.apache.spark.sql.execution.datasources.FileFormatWriter logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.FileFormatWriter.name = org.apache.spark.sql.execution.datasources.FileFormatWriter
logger.FileFormatWriter.level = all

Refer to Logging.