FileFormatWriter¶
FileFormatWriter utility is used to write out query result for the following:
- Hive-related InsertIntoHiveDirCommand and InsertIntoHiveTable logical commands (via SaveAsHiveFile.saveAsHiveFile)
- InsertIntoHadoopFsRelationCommand logical command
FileStreamSink(Spark Structured Streaming) is requested to write out a micro-batch
Writing Out Query Result¶
write(
sparkSession: SparkSession,
plan: SparkPlan,
fileFormat: FileFormat,
committer: FileCommitProtocol,
outputSpec: OutputSpec,
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
statsTrackers: Seq[WriteJobStatsTracker],
options: Map[String, String],
numStaticPartitionCols: Int = 0): Set[String]
write creates a Hadoop Job instance (with the given Hadoop Configuration) and uses the following job output classes:
| Job Output Property | Class |
|---|---|
| Key | Void |
| Value | InternalRow |
write sets the output directory (mapreduce.output.fileoutputformat.outputdir property of the map-reduce job) to be the outputPath of the given OutputSpec.
write requests the given FileFormat to prepareWrite.
write creates a WriteJobDescription with the following options (if available):
| Option | Fallback Configuration Property |
|---|---|
maxRecordsPerFile | spark.sql.files.maxRecordsPerFile |
timeZone | spark.sql.session.timeZone |
write sets spark.sql.sources.writeJobUUID configuration in the map-reduce Job instance.
In the end, write executeWrite.
executeWrite¶
executeWrite(
session: SparkSession,
planForWrites: SparkPlan,
writeFilesSpec: WriteFilesSpec,
job: Job): Set[String]
executeWrite(
sparkSession: SparkSession,
plan: SparkPlan,
job: Job,
description: WriteJobDescription,
committer: FileCommitProtocol,
outputSpec: OutputSpec,
requiredOrdering: Seq[Expression],
partitionColumns: Seq[Attribute],
sortColumns: Seq[Attribute],
orderingMatched: Boolean): Set[String]
executeWrite writeAndCommit (with the given Hadoop Job, WriteJobDescription, and FileCommitProtocol) and a function that does the following:
- Prepares an
RDD[WriterCommitMessage](by executing the givenWriteFilesSpecor theSparkPlan) - Runs a Spark job for the
RDD[WriterCommitMessage]that "collects"WriteTaskResults (from executing write tasks)
writeAndCommit¶
writeAndCommit(
job: Job,
description: WriteJobDescription,
committer: FileCommitProtocol)(
f: => Array[WriteTaskResult]): Set[String]
writeAndCommit...FIXME
writeAndCommit prints out the following INFO message to the logs:
Start to commit write Job [uuid].
writeAndCommit requests the given FileCommitProtocol to commitJob.
writeAndCommit prints out the following INFO message to the logs:
Write Job [uuid] committed. Elapsed time: [duration] ms.
writeAndCommit processStats.
writeAndCommit prints out the following INFO message to the logs:
Finished processing stats for write job [uuid].
writeAndCommit returns the updated partitions.
In case of any Throwable, writeAndCommit prints out the following ERROR message to the logs:
Aborting job [uuid].
writeAndCommit requests the given FileCommitProtocol to abortJob.
Usage¶
write is used when:
SaveAsHiveFileis requested to saveAsHiveFile- InsertIntoHadoopFsRelationCommand logical command is executed
FileStreamSink(Spark Structured Streaming) is requested toaddBatch
Review Me¶
write requests the given FileCommitProtocol committer to setupJob.
write executes the given SparkPlan (and generates an RDD). The execution can be directly on the given physical operator if ordering matches the requirements or uses SortExec physical operator (with global flag off).
write runs a Spark job (action) on the RDD with executeTask as the partition function. The result task handler simply requests the given FileCommitProtocol committer to onTaskCommit (with the TaskCommitMessage of a WriteTaskResult) and saves the WriteTaskResult.
write requests the given FileCommitProtocol committer to commitJob (with the Hadoop Job instance and the TaskCommitMessage of all write tasks).
write prints out the following INFO message to the logs:
Write Job [uuid] committed.
write processStats.
write prints out the following INFO message to the logs:
Finished processing stats for write job [uuid].
In the end, write returns all the partition paths that were updated during this write job.
write And Throwables¶
In case of any Throwable, write prints out the following ERROR message to the logs:
Aborting job [uuid].
write requests the given FileCommitProtocol committer to abortJob (with the Hadoop Job instance).
In the end, write throws a SparkException.
Writing Data Out In Single Spark Task¶
executeTask(
description: WriteJobDescription,
jobIdInstant: Long,
sparkStageId: Int,
sparkPartitionId: Int,
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[InternalRow]): WriteTaskResult
executeTask...FIXME
Processing Write Job Statistics¶
processStats(
statsTrackers: Seq[WriteJobStatsTracker],
statsPerTask: Seq[Seq[WriteTaskStats]],
jobCommitDuration: Long): Unit
processStats requests every WriteJobStatsTracker to processStats (for respective WriteTaskStats in the given statsPerTask).
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.datasources.FileFormatWriter logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.FileFormatWriter.name = org.apache.spark.sql.execution.datasources.FileFormatWriter
logger.FileFormatWriter.level = all
Refer to Logging.