SparkHadoopWriter Utility¶
Writing Key-Value RDD Out (As Hadoop OutputFormat)¶
write[K, V: ClassTag](
rdd: RDD[(K, V)],
config: HadoopWriteConfigUtil[K, V]): Unit
write
runs a Spark job to write out partition records (for all partitions of the given key-value RDD
) with the given HadoopWriteConfigUtil and a HadoopMapReduceCommitProtocol committer.
The number of writer tasks (parallelism) is the number of the partitions in the given key-value RDD
.
Internals¶
Internally, write
uses the id of the given RDD as the commitJobId
.
write
creates a jobTrackerId
with the current date.
write
requests the given HadoopWriteConfigUtil
to create a Hadoop JobContext (for the jobTrackerId and commitJobId).
write
requests the given HadoopWriteConfigUtil
to initOutputFormat with the Hadoop JobContext.
write
requests the given HadoopWriteConfigUtil
to assertConf.
write
requests the given HadoopWriteConfigUtil
to create a HadoopMapReduceCommitProtocol committer for the commitJobId.
write
requests the HadoopMapReduceCommitProtocol
to setupJob (with the jobContext).
write
uses the SparkContext
(of the given RDD) to run a Spark job asynchronously for the given RDD with the executeTask partition function.
In the end, write
requests the HadoopMapReduceCommitProtocol to commit the job and prints out the following INFO message to the logs:
Job [getJobID] committed.
Throwables¶
In case of any Throwable
, write
prints out the following ERROR message to the logs:
Aborting job [getJobID].
write
requests the HadoopMapReduceCommitProtocol to abort the job and throws a SparkException
:
Job aborted.
Usage¶
write
is used when:
Writing RDD Partition¶
executeTask[K, V: ClassTag](
context: TaskContext,
config: HadoopWriteConfigUtil[K, V],
jobTrackerId: String,
commitJobId: Int,
sparkPartitionId: Int,
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[(K, V)]): TaskCommitMessage
Fixme
Review Me
executeTask
requests the given HadoopWriteConfigUtil
to create a TaskAttemptContext.
executeTask
requests the given FileCommitProtocol
to set up a task with the TaskAttemptContext
.
executeTask
requests the given HadoopWriteConfigUtil
to initWriter (with the TaskAttemptContext
and the given sparkPartitionId
).
executeTask
initHadoopOutputMetrics.
executeTask
writes all rows of the RDD partition (from the given Iterator[(K, V)]
). executeTask
requests the given HadoopWriteConfigUtil
to write. In the end, executeTask
requests the given HadoopWriteConfigUtil
to closeWriter and the given FileCommitProtocol
to commit the task.
executeTask
updates metrics about writing data to external systems (bytesWritten and recordsWritten) every few records and at the end.
In case of any errors, executeTask
requests the given HadoopWriteConfigUtil
to closeWriter and the given FileCommitProtocol
to abort the task. In the end, executeTask
prints out the following ERROR message to the logs:
Task [taskAttemptID] aborted.
executeTask
is used when:
SparkHadoopWriter
utility is used to write
Logging¶
Enable ALL
logging level for org.apache.spark.internal.io.SparkHadoopWriter
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.internal.io.SparkHadoopWriter=ALL
Refer to Logging.