FileFormatWriter Utility¶
FileFormatWriter
utility is used to write out query result for the following:
- Hive-related InsertIntoHiveDirCommand and InsertIntoHiveTable logical commands (via SaveAsHiveFile.saveAsHiveFile)
- InsertIntoHadoopFsRelationCommand logical command
FileStreamSink
(Spark Structured Streaming) is requested to write out a micro-batch
Writing Out Query Result¶
write(
sparkSession: SparkSession,
plan: SparkPlan,
fileFormat: FileFormat,
committer: FileCommitProtocol,
outputSpec: OutputSpec,
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
statsTrackers: Seq[WriteJobStatsTracker],
options: Map[String, String]): Set[String]
write
creates a Hadoop Job instance (with the given Hadoop Configuration) and uses the following job output classes:
-
Void
for keys -
InternalRow
for values
write
sets the output directory (for the map-reduce job) to be the outputPath
of the given OutputSpec
.
write
requests the given FileFormat
to prepareWrite.
write
creates a WriteJobDescription
with the following:
-
maxRecordsPerFile
based on themaxRecordsPerFile
option (from the given options) if available or spark.sql.files.maxRecordsPerFile -
timeZoneId
based on thetimeZone
option (from the given options) if available or spark.sql.session.timeZone
write
requests the given FileCommitProtocol
committer to setupJob
.
write
executes the given SparkPlan (and generates an RDD). The execution can be directly on the given physical operator if ordering matches the requirements or uses SortExec physical operator (with global
flag off).
write
runs a Spark job (action) on the RDD with executeTask as the partition function. The result task handler simply requests the given FileCommitProtocol
committer to onTaskCommit
(with the TaskCommitMessage
of a WriteTaskResult
) and saves the WriteTaskResult
.
write
requests the given FileCommitProtocol
committer to commitJob
(with the Hadoop Job
instance and the TaskCommitMessage
of all write tasks).
write
prints out the following INFO message to the logs:
Write Job [uuid] committed.
write
processStats.
write
prints out the following INFO message to the logs:
Finished processing stats for write job [uuid].
In the end, write
returns all the partition paths that were updated during this write job.
write
is used when:
- InsertIntoHadoopFsRelationCommand logical command is executed
SaveAsHiveFile
is requested to saveAsHiveFile (when InsertIntoHiveDirCommand and InsertIntoHiveTable logical commands are executed)- (Spark Structured Streaming)
FileStreamSink
is requested to write out a micro-batch
write And Throwables¶
In case of any Throwable
, write
prints out the following ERROR message to the logs:
Aborting job [uuid].
write
requests the given FileCommitProtocol
committer to abortJob
(with the Hadoop Job
instance).
In the end, write
throws a SparkException
.
Writing Data Out In Single Spark Task¶
executeTask(
description: WriteJobDescription,
jobIdInstant: Long,
sparkStageId: Int,
sparkPartitionId: Int,
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[InternalRow]): WriteTaskResult
executeTask
...FIXME
Processing Write Job Statistics¶
processStats(
statsTrackers: Seq[WriteJobStatsTracker],
statsPerTask: Seq[Seq[WriteTaskStats]],
jobCommitDuration: Long): Unit
processStats
requests every WriteJobStatsTracker to processStats (for respective WriteTaskStats in the given statsPerTask
).
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.datasources.FileFormatWriter
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.execution.datasources.FileFormatWriter=ALL
Refer to Logging.