Skip to content

SparkHadoopWriter Utility

Writing Key-Value RDD Out (As Hadoop OutputFormat)

write[K, V: ClassTag](
  rdd: RDD[(K, V)],
  config: HadoopWriteConfigUtil[K, V]): Unit

write runs a Spark job to write out partition records (for all partitions of the given key-value RDD) with the given HadoopWriteConfigUtil and a HadoopMapReduceCommitProtocol committer.

The number of writer tasks (parallelism) is the number of the partitions in the given key-value RDD.

Internals

Internally, write uses the id of the given RDD as the commitJobId.

write creates a jobTrackerId with the current date.

write requests the given HadoopWriteConfigUtil to create a Hadoop JobContext (for the jobTrackerId and commitJobId).

write requests the given HadoopWriteConfigUtil to initOutputFormat with the Hadoop JobContext.

write requests the given HadoopWriteConfigUtil to assertConf.

write requests the given HadoopWriteConfigUtil to create a HadoopMapReduceCommitProtocol committer for the commitJobId.

write requests the HadoopMapReduceCommitProtocol to setupJob (with the jobContext).

write uses the SparkContext (of the given RDD) to run a Spark job asynchronously for the given RDD with the executeTask partition function.

In the end, write requests the HadoopMapReduceCommitProtocol to commit the job and prints out the following INFO message to the logs:

Job [getJobID] committed.

Throwables

In case of any Throwable, write prints out the following ERROR message to the logs:

Aborting job [getJobID].

write requests the HadoopMapReduceCommitProtocol to abort the job and throws a SparkException:

Job aborted.

Usage

write is used when:

Writing RDD Partition

executeTask[K, V: ClassTag](
  context: TaskContext,
  config: HadoopWriteConfigUtil[K, V],
  jobTrackerId: String,
  commitJobId: Int,
  sparkPartitionId: Int,
  sparkAttemptNumber: Int,
  committer: FileCommitProtocol,
  iterator: Iterator[(K, V)]): TaskCommitMessage

Fixme

Review Me

executeTask requests the given HadoopWriteConfigUtil to create a TaskAttemptContext.

executeTask requests the given FileCommitProtocol to set up a task with the TaskAttemptContext.

executeTask requests the given HadoopWriteConfigUtil to initWriter (with the TaskAttemptContext and the given sparkPartitionId).

executeTask initHadoopOutputMetrics.

executeTask writes all rows of the RDD partition (from the given Iterator[(K, V)]). executeTask requests the given HadoopWriteConfigUtil to write. In the end, executeTask requests the given HadoopWriteConfigUtil to closeWriter and the given FileCommitProtocol to commit the task.

executeTask updates metrics about writing data to external systems (bytesWritten and recordsWritten) every few records and at the end.

In case of any errors, executeTask requests the given HadoopWriteConfigUtil to closeWriter and the given FileCommitProtocol to abort the task. In the end, executeTask prints out the following ERROR message to the logs:

Task [taskAttemptID] aborted.

executeTask is used when:

  • SparkHadoopWriter utility is used to write

Logging

Enable ALL logging level for org.apache.spark.internal.io.SparkHadoopWriter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.internal.io.SparkHadoopWriter=ALL

Refer to Logging.