Skip to content

SparkHadoopWriter

SparkHadoopWriter utility is used to <>.

SparkHadoopWriter utility is used by rdd:PairRDDFunctions.md#saveAsNewAPIHadoopDataset[saveAsNewAPIHadoopDataset] and rdd:PairRDDFunctions.md#saveAsHadoopDataset[saveAsHadoopDataset] transformations.

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.internal.io.SparkHadoopWriter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.internal.io.SparkHadoopWriter=ALL

Refer to <>.

== [[write]] Writing Key-Value RDD Out (As Hadoop OutputFormat) -- write Utility

[source, scala]

writeK, V: ClassTag: Unit


[[write-commitJobId]] write uses the id of the given RDD as the commitJobId.

[[write-jobTrackerId]] write creates a jobTrackerId with the current date.

[[write-jobContext]] write requests the given HadoopWriteConfigUtil to <> (for the <> and <>).

write requests the given HadoopWriteConfigUtil to <> with the Hadoop https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/mapreduce/JobContext.html[JobContext].

write requests the given HadoopWriteConfigUtil to <>.

write requests the given HadoopWriteConfigUtil to <> for the <>.

write requests the HadoopMapReduceCommitProtocol to <> (with the <>).

[[write-runJob]][[write-executeTask]] write uses the SparkContext (of the given RDD) to ROOT:SparkContext.md#runJob[run a Spark job asynchronously] for the given RDD with the <> partition function.

[[write-commitJob]] In the end, write requests the <> to <> and prints out the following INFO message to the logs:

Job [getJobID] committed.

NOTE: write is used when PairRDDFunctions is requested to rdd:PairRDDFunctions.md#saveAsNewAPIHadoopDataset[saveAsNewAPIHadoopDataset] and rdd:PairRDDFunctions.md#saveAsHadoopDataset[saveAsHadoopDataset].

=== [[write-Throwable]] write Utility And Throwables

In case of any Throwable, write prints out the following ERROR message to the logs:

Aborting job [getJobID].

[[write-abortJob]] write requests the <> to <> and throws a SparkException:

Job aborted.

Writing RDD Partition

executeTask[K, V: ClassTag](
  context: TaskContext,
  config: HadoopWriteConfigUtil[K, V],
  jobTrackerId: String,
  commitJobId: Int,
  sparkPartitionId: Int,
  sparkAttemptNumber: Int,
  committer: FileCommitProtocol,
  iterator: Iterator[(K, V)]): TaskCommitMessage

executeTask requests the given HadoopWriteConfigUtil to create a TaskAttemptContext.

executeTask requests the given FileCommitProtocol to set up a task with the TaskAttemptContext.

executeTask requests the given HadoopWriteConfigUtil to initWriter (with the TaskAttemptContext and the given sparkPartitionId).

executeTask initHadoopOutputMetrics.

executeTask writes all rows of the RDD partition (from the given Iterator[(K, V)]). executeTask requests the given HadoopWriteConfigUtil to write. In the end, executeTask requests the given HadoopWriteConfigUtil to closeWriter and the given FileCommitProtocol to commit the task.

executeTask updates metrics about writing data to external systems (bytesWritten and recordsWritten) every few records and at the end.

In case of any errors, executeTask requests the given HadoopWriteConfigUtil to closeWriter and the given FileCommitProtocol to abort the task. In the end, executeTask prints out the following ERROR message to the logs:

Task [taskAttemptID] aborted.

executeTask is used when SparkHadoopWriter utility is used to write.

== [[initHadoopOutputMetrics]] initHadoopOutputMetrics Utility

[source, scala]

initHadoopOutputMetrics( context: TaskContext): (OutputMetrics, () => Long)


initHadoopOutputMetrics...FIXME

NOTE: initHadoopOutputMetrics is used when SparkHadoopWriter utility is used to <>.


Last update: 2020-10-18