Skip to content

OutputCommitCoordinator

OutputCommitCoordinator is used to coordinate <> by means of commit locks (using the internal <> registry).

[[result-commits]] Result commits are the outputs of running tasks (and a running task is described by a task attempt for a partition in a stage).

TIP: A partition (of a stage) is unlocked when it is marked as -1 in <authorizedCommittersByStage internal registry>>.

From the scaladoc (it's a private[spark] class so no way to find it https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala[outside the code]):

Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins" policy. OutputCommitCoordinator is instantiated in both the drivers and executors. On executors, it is configured with a reference to the driver's OutputCommitCoordinatorEndpoint, so requests to commit output will be forwarded to the driver's OutputCommitCoordinator.

The most interesting piece is in...

This class was introduced in https://issues.apache.org/jira/browse/SPARK-4879[SPARK-4879]; see that JIRA issue (and the associated pull requests) for an extensive design discussion.

[[authorized-committers]] Authorized committers are task attempts (per partition and stage) that can...FIXME

== [[stop]] stop Method

CAUTION: FIXME

== [[stageStart]] Stage Execution Started Notification

CAUTION: FIXME

== [[taskCompleted]] taskCompleted Method

[source, scala]

taskCompleted( stage: StageId, partition: PartitionId, attemptNumber: TaskAttemptNumber, reason: TaskEndReason): Unit


taskCompleted marks the partition (in the stage) completed (and hence a result committed), but only when the attemptNumber is amongst <> per stage (for the partition).

Internally, taskCompleted first finds <> for the stage.

For task completions with no stage registered in <authorizedCommittersByStage internal registry>>, you should see the following DEBUG message in the logs and taskCompleted simply exits.

DEBUG OutputCommitCoordinator: Ignoring task completion for completed stage

For the reason being Success taskCompleted does nothing and exits.

For the reason being TaskCommitDenied, you should see the following INFO message in the logs and taskCompleted exits.

INFO OutputCommitCoordinator: Task was denied committing, stage: [stage], partition: [partition], attempt: [attemptNumber]

NOTE: For no stage registered or reason being Success or TaskCommitDenied, taskCompleted does nothing (important).

For task completion reasons other than Success or TaskCommitDenied and attemptNumber amongst <>, taskCompleted <partition unlocked>>.

NOTE: A task attempt can never be -1.

When the lock for partition is cleared, You should see the following DEBUG message in the logs:

DEBUG OutputCommitCoordinator: Authorized committer (attemptNumber=[attemptNumber], stage=[stage], partition=[partition]) failed; clearing lock

NOTE: taskCompleted is executed only when scheduler:DAGSchedulerEventProcessLoop.md#handleTaskCompletion[DAGScheduler informs that a task has completed].

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.scheduler.OutputCommitCoordinator logger to see what happens inside.

Add the following line to conf/log4j.properties:

[source]

log4j.logger.org.apache.spark.scheduler.OutputCommitCoordinator=ALL

Refer to ROOT:spark-logging.md[Logging].

== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| [[authorizedCommittersByStage]] authorizedCommittersByStage | Tracks commit locks for task attempts for a partition in a stage.

Used in <> to authorize task completions to...FIXME

|===


Last update: 2020-10-06