FileFormatWriter¶
FileFormatWriter
utility is used to write out query result for the following:
- Hive-related InsertIntoHiveDirCommand and InsertIntoHiveTable logical commands (via SaveAsHiveFile.saveAsHiveFile)
- InsertIntoHadoopFsRelationCommand logical command
FileStreamSink
(Spark Structured Streaming) is requested to write out a micro-batch
Writing Out Query Result¶
write(
sparkSession: SparkSession,
plan: SparkPlan,
fileFormat: FileFormat,
committer: FileCommitProtocol,
outputSpec: OutputSpec,
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
statsTrackers: Seq[WriteJobStatsTracker],
options: Map[String, String],
numStaticPartitionCols: Int = 0): Set[String]
write
creates a Hadoop Job instance (with the given Hadoop Configuration) and uses the following job output classes:
Job Output Property | Class |
---|---|
Key | Void |
Value | InternalRow |
write
sets the output directory (mapreduce.output.fileoutputformat.outputdir
property of the map-reduce job) to be the outputPath
of the given OutputSpec
.
write
requests the given FileFormat
to prepareWrite.
write
creates a WriteJobDescription
with the following options (if available):
Option | Fallback Configuration Property |
---|---|
maxRecordsPerFile | spark.sql.files.maxRecordsPerFile |
timeZone | spark.sql.session.timeZone |
write
sets spark.sql.sources.writeJobUUID
configuration in the map-reduce Job instance.
In the end, write
executeWrite.
executeWrite¶
executeWrite(
session: SparkSession,
planForWrites: SparkPlan,
writeFilesSpec: WriteFilesSpec,
job: Job): Set[String]
executeWrite(
sparkSession: SparkSession,
plan: SparkPlan,
job: Job,
description: WriteJobDescription,
committer: FileCommitProtocol,
outputSpec: OutputSpec,
requiredOrdering: Seq[Expression],
partitionColumns: Seq[Attribute],
sortColumns: Seq[Attribute],
orderingMatched: Boolean): Set[String]
executeWrite
writeAndCommit (with the given Hadoop Job
, WriteJobDescription
, and FileCommitProtocol
) and a function that does the following:
- Prepares an
RDD[WriterCommitMessage]
(by executing the givenWriteFilesSpec
or theSparkPlan
) - Runs a Spark job for the
RDD[WriterCommitMessage]
that "collects"WriteTaskResult
s (from executing write tasks)
writeAndCommit¶
writeAndCommit(
job: Job,
description: WriteJobDescription,
committer: FileCommitProtocol)(
f: => Array[WriteTaskResult]): Set[String]
writeAndCommit
...FIXME
writeAndCommit
prints out the following INFO message to the logs:
Start to commit write Job [uuid].
writeAndCommit
requests the given FileCommitProtocol
to commitJob
.
writeAndCommit
prints out the following INFO message to the logs:
Write Job [uuid] committed. Elapsed time: [duration] ms.
writeAndCommit
processStats.
writeAndCommit
prints out the following INFO message to the logs:
Finished processing stats for write job [uuid].
writeAndCommit
returns the updated partitions.
In case of any Throwable
, writeAndCommit
prints out the following ERROR message to the logs:
Aborting job [uuid].
writeAndCommit
requests the given FileCommitProtocol
to abortJob
.
Usage¶
write
is used when:
SaveAsHiveFile
is requested to saveAsHiveFile- InsertIntoHadoopFsRelationCommand logical command is executed
FileStreamSink
(Spark Structured Streaming) is requested toaddBatch
Review Me¶
write
requests the given FileCommitProtocol
committer to setupJob
.
write
executes the given SparkPlan (and generates an RDD). The execution can be directly on the given physical operator if ordering matches the requirements or uses SortExec physical operator (with global
flag off).
write
runs a Spark job (action) on the RDD with executeTask as the partition function. The result task handler simply requests the given FileCommitProtocol
committer to onTaskCommit
(with the TaskCommitMessage
of a WriteTaskResult
) and saves the WriteTaskResult
.
write
requests the given FileCommitProtocol
committer to commitJob
(with the Hadoop Job
instance and the TaskCommitMessage
of all write tasks).
write
prints out the following INFO message to the logs:
Write Job [uuid] committed.
write
processStats.
write
prints out the following INFO message to the logs:
Finished processing stats for write job [uuid].
In the end, write
returns all the partition paths that were updated during this write job.
write And Throwables¶
In case of any Throwable
, write
prints out the following ERROR message to the logs:
Aborting job [uuid].
write
requests the given FileCommitProtocol
committer to abortJob
(with the Hadoop Job
instance).
In the end, write
throws a SparkException
.
Writing Data Out In Single Spark Task¶
executeTask(
description: WriteJobDescription,
jobIdInstant: Long,
sparkStageId: Int,
sparkPartitionId: Int,
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[InternalRow]): WriteTaskResult
executeTask
...FIXME
Processing Write Job Statistics¶
processStats(
statsTrackers: Seq[WriteJobStatsTracker],
statsPerTask: Seq[Seq[WriteTaskStats]],
jobCommitDuration: Long): Unit
processStats
requests every WriteJobStatsTracker to processStats (for respective WriteTaskStats in the given statsPerTask
).
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.datasources.FileFormatWriter
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.FileFormatWriter.name = org.apache.spark.sql.execution.datasources.FileFormatWriter
logger.FileFormatWriter.level = all
Refer to Logging.