OutputCommitCoordinator

OutputCommitCoordinator service is authority that coordinates result commits by means of commit locks (using the internal authorizedCommittersByStage registry).

Result commits are the outputs of running tasks (and a running task is described by a task attempt for a partition in a stage).

A partition (of a stage) is unlocked when it is marked as -1 in authorizedCommittersByStage internal registry.

From the scaladoc (it’s a private[spark] class so no way to find it outside the code):

Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins" policy. OutputCommitCoordinator is instantiated in both the drivers and executors. On executors, it is configured with a reference to the driver’s OutputCommitCoordinatorEndpoint, so requests to commit output will be forwarded to the driver’s OutputCommitCoordinator.

The most interesting piece is in…​

This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests) for an extensive design discussion.

Authorized committers are task attempts (per partition and stage) that can…​FIXME

Table 1. OutputCommitCoordinator Internal Registries and Counters
Name Description

authorizedCommittersByStage

Tracks commit locks for task attempts for a partition in a stage.

Used in taskCompleted to authorize task completions to…​FIXME

Enable INFO or DEBUG logging level for org.apache.spark.scheduler.OutputCommitCoordinator logger to see what happens in OutputCommitCoordinator.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.OutputCommitCoordinator=DEBUG

Refer to Logging.

stop Method

FIXME

stageStart Method

FIXME

taskCompleted Method

taskCompleted(
  stage: StageId,
  partition: PartitionId,
  attemptNumber: TaskAttemptNumber,
  reason: TaskEndReason): Unit

taskCompleted marks the partition (in the stage) completed (and hence a result committed), but only when the attemptNumber is amongst authorized committers per stage (for the partition).

Internally, taskCompleted first finds authorized committers for the stage.

For task completions with no stage registered in authorizedCommittersByStage internal registry, you should see the following DEBUG message in the logs and taskCompleted simply exits.

DEBUG OutputCommitCoordinator: Ignoring task completion for completed stage

For the reason being Success taskCompleted does nothing and exits.

For the reason being TaskCommitDenied, you should see the following INFO message in the logs and taskCompleted exits.

INFO OutputCommitCoordinator: Task was denied committing, stage: [stage], partition: [partition], attempt: [attemptNumber]
For no stage registered or reason being Success or TaskCommitDenied, taskCompleted does nothing (important).

For task completion reasons other than Success or TaskCommitDenied and attemptNumber amongst authorized committers, taskCompleted marks partition unlocked.

A task attempt can never be -1.

When the lock for partition is cleared, You should see the following DEBUG message in the logs:

DEBUG OutputCommitCoordinator: Authorized committer (attemptNumber=[attemptNumber], stage=[stage], partition=[partition]) failed; clearing lock
taskCompleted is executed only when DAGScheduler informs that a task has completed.