Skip to content

OutputCommitCoordinator

From the scaladoc (it's a private[spark] class so no way to find it outside the code):

Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins" policy.

OutputCommitCoordinator is instantiated in both the drivers and executors. On executors, it is configured with a reference to the driver's OutputCommitCoordinatorEndpoint, so requests to commit output will be forwarded to the driver's OutputCommitCoordinator.

This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests) for an extensive design discussion.

Creating Instance

OutputCommitCoordinator takes the following to be created:

OutputCommitCoordinator is created when:

OutputCommitCoordinator RPC Endpoint

coordinatorRef: Option[RpcEndpointRef]

OutputCommitCoordinator is registered as OutputCommitCoordinator (with OutputCommitCoordinatorEndpoint RPC Endpoint) in the RPC Environment on the driver (when SparkEnv utility is used to create "base" SparkEnv). Executors have an RpcEndpointRef to the endpoint on the driver.

coordinatorRef is used to post an AskPermissionToCommitOutput (by executors) to the OutputCommitCoordinator (when canCommit).

coordinatorRef is used to stop the OutputCommitCoordinator on the driver (when stop).

canCommit

canCommit(
  stage: Int,
  stageAttempt: Int,
  partition: Int,
  attemptNumber: Int): Boolean

canCommit creates a AskPermissionToCommitOutput message and sends it (asynchronously) to the OutputCommitCoordinator RPC Endpoint.

canCommit is used when:

  • SparkHadoopMapRedUtil is requested to commitTask (with spark.hadoop.outputCommitCoordination.enabled configuration property enabled)
  • DataWritingSparkTask (Spark SQL) utility is used to run

handleAskPermissionToCommit

handleAskPermissionToCommit(
  stage: Int,
  stageAttempt: Int,
  partition: Int,
  attemptNumber: Int): Boolean

handleAskPermissionToCommit...FIXME

handleAskPermissionToCommit is used when:

  • OutputCommitCoordinatorEndpoint is requested to handle a AskPermissionToCommitOutput message (that happens after it was sent out in canCommit)

Logging

Enable ALL logging level for org.apache.spark.scheduler.OutputCommitCoordinator logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.OutputCommitCoordinator=ALL

Refer to Logging.