SparkHadoopWriter

SparkHadoopWriter utility is used to write a key-value RDD (as a Hadoop OutputFormat).

SparkHadoopWriter utility is used by RDD.saveAsNewAPIHadoopDataset and RDD.saveAsHadoopDataset transformations.

Enable ALL logging level for org.apache.spark.internal.io.SparkHadoopWriter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.internal.io.SparkHadoopWriter=ALL

Refer to Logging.

Writing Key-Value RDD Out (As Hadoop OutputFormat) — write Utility

write[K, V: ClassTag](
  rdd: RDD[(K, V)],
  config: HadoopWriteConfigUtil[K, V]): Unit

write uses the id of the given RDD as the commitJobId.

write creates a jobTrackerId with the current date.

write requests the given HadoopWriteConfigUtil to create a Hadoop JobContext (for the jobTrackerId and commitJobId).

write requests the given HadoopWriteConfigUtil to initOutputFormat with the Hadoop JobContext.

write requests the given HadoopWriteConfigUtil to assertConf.

write requests the given HadoopWriteConfigUtil to create a HadoopMapReduceCommitProtocol committer for the commitJobId.

write requests the HadoopMapReduceCommitProtocol to setupJob (with the jobContext).

write uses the SparkContext (of the given RDD) to run a Spark job asynchronously for the given RDD with the executeTask partition function.

In the end, write requests the HadoopMapReduceCommitProtocol to commit the job and prints out the following INFO message to the logs:

Job [getJobID] committed.
write is used when PairRDDFunctions is requested to saveAsNewAPIHadoopDataset and saveAsHadoopDataset.

write Utility And Throwables

In case of any Throwable, write prints out the following ERROR message to the logs:

Aborting job [getJobID].

write requests the HadoopMapReduceCommitProtocol to abort the job and throws a SparkException:

Job aborted.

Writing RDD Partition — executeTask Internal Utility

executeTask[K, V: ClassTag](
  context: TaskContext,
  config: HadoopWriteConfigUtil[K, V],
  jobTrackerId: String,
  commitJobId: Int,
  sparkPartitionId: Int,
  sparkAttemptNumber: Int,
  committer: FileCommitProtocol,
  iterator: Iterator[(K, V)]): TaskCommitMessage

executeTask requests the given HadoopWriteConfigUtil to create a TaskAttemptContext.

executeTask requests the given FileCommitProtocol to set up a task with the TaskAttemptContext.

executeTask requests the given HadoopWriteConfigUtil to initWriter (with the TaskAttemptContext and the given sparkPartitionId).

executeTask writes all rows of the RDD partition (from the given Iterator[(K, V)]). executeTask requests the given HadoopWriteConfigUtil to write. In the end, executeTask requests the given HadoopWriteConfigUtil to closeWriter and the given FileCommitProtocol to commit the task.

executeTask updates metrics about writing data to external systems (bytesWritten and recordsWritten) every few records and at the end.

In case of any errors, executeTask requests the given HadoopWriteConfigUtil to closeWriter and the given FileCommitProtocol to abort the task. In the end, executeTask prints out the following ERROR message to the logs:

Task [taskAttemptID] aborted.
executeTask is used when SparkHadoopWriter utility is used to write.

initHadoopOutputMetrics Utility

initHadoopOutputMetrics(
  context: TaskContext): (OutputMetrics, () => Long)

initHadoopOutputMetrics…​FIXME

initHadoopOutputMetrics is used when SparkHadoopWriter utility is used to executeTask.