OutputCommitCoordinator¶
From the scaladoc (it's a private[spark] class so no way to find it outside the code):
Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins" policy.
OutputCommitCoordinator is instantiated in both the drivers and executors. On executors, it is configured with a reference to the driver's OutputCommitCoordinatorEndpoint, so requests to commit output will be forwarded to the driver's OutputCommitCoordinator.
This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests) for an extensive design discussion.
Creating Instance¶
OutputCommitCoordinator takes the following to be created:
- SparkConf
-
isDriverflag
OutputCommitCoordinator is created when:
SparkEnvutility is used to create a SparkEnv on the driver
OutputCommitCoordinator RPC Endpoint¶
coordinatorRef: Option[RpcEndpointRef]
OutputCommitCoordinator is registered as OutputCommitCoordinator (with OutputCommitCoordinatorEndpoint RPC Endpoint) in the RPC Environment on the driver (when SparkEnv utility is used to create "base" SparkEnv). Executors have an RpcEndpointRef to the endpoint on the driver.
coordinatorRef is used to post an AskPermissionToCommitOutput (by executors) to the OutputCommitCoordinator (when canCommit).
coordinatorRef is used to stop the OutputCommitCoordinator on the driver (when stop).
canCommit¶
canCommit(
stage: Int,
stageAttempt: Int,
partition: Int,
attemptNumber: Int): Boolean
canCommit creates a AskPermissionToCommitOutput message and sends it (asynchronously) to the OutputCommitCoordinator RPC Endpoint.
canCommit is used when:
SparkHadoopMapRedUtilis requested tocommitTask(withspark.hadoop.outputCommitCoordination.enabledconfiguration property enabled)DataWritingSparkTask(Spark SQL) utility is used torun
handleAskPermissionToCommit¶
handleAskPermissionToCommit(
stage: Int,
stageAttempt: Int,
partition: Int,
attemptNumber: Int): Boolean
handleAskPermissionToCommit...FIXME
handleAskPermissionToCommit is used when:
OutputCommitCoordinatorEndpointis requested to handle aAskPermissionToCommitOutputmessage (that happens after it was sent out in canCommit)
Logging¶
Enable ALL logging level for org.apache.spark.scheduler.OutputCommitCoordinator logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.scheduler.OutputCommitCoordinator=ALL
Refer to Logging.