OutputCommitCoordinator¶
OutputCommitCoordinator
is used to coordinate <
[[result-commits]] Result commits are the outputs of running tasks (and a running task is described by a task attempt for a partition in a stage).
TIP: A partition (of a stage) is unlocked when it is marked as -1
in <
From the scaladoc (it's a private[spark]
class so no way to find it https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala[outside the code]):
Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins" policy. OutputCommitCoordinator is instantiated in both the drivers and executors. On executors, it is configured with a reference to the driver's OutputCommitCoordinatorEndpoint, so requests to commit output will be forwarded to the driver's OutputCommitCoordinator.
The most interesting piece is in...
This class was introduced in https://issues.apache.org/jira/browse/SPARK-4879[SPARK-4879]; see that JIRA issue (and the associated pull requests) for an extensive design discussion.
[[authorized-committers]] Authorized committers are task attempts (per partition and stage) that can...FIXME
== [[stop]] stop
Method
CAUTION: FIXME
== [[stageStart]] Stage Execution Started Notification
CAUTION: FIXME
== [[taskCompleted]] taskCompleted
Method
[source, scala]¶
taskCompleted( stage: StageId, partition: PartitionId, attemptNumber: TaskAttemptNumber, reason: TaskEndReason): Unit
taskCompleted
marks the partition
(in the stage
) completed (and hence a result committed), but only when the attemptNumber
is amongst <partition
).
Internally, taskCompleted
first finds <stage
.
For task completions with no stage registered in <taskCompleted
simply exits.
DEBUG OutputCommitCoordinator: Ignoring task completion for completed stage
For the reason
being Success
taskCompleted
does nothing and exits.
For the reason
being TaskCommitDenied
, you should see the following INFO message in the logs and taskCompleted
exits.
INFO OutputCommitCoordinator: Task was denied committing, stage: [stage], partition: [partition], attempt: [attemptNumber]
NOTE: For no stage
registered or reason
being Success
or TaskCommitDenied
, taskCompleted
does nothing (important).
For task completion reasons other than Success
or TaskCommitDenied
and attemptNumber
amongst <taskCompleted
<
NOTE: A task attempt can never be -1
.
When the lock for partition
is cleared, You should see the following DEBUG message in the logs:
DEBUG OutputCommitCoordinator: Authorized committer (attemptNumber=[attemptNumber], stage=[stage], partition=[partition]) failed; clearing lock
NOTE: taskCompleted
is executed only when scheduler:DAGSchedulerEventProcessLoop.md#handleTaskCompletion[DAGScheduler
informs that a task has completed].
== [[logging]] Logging
Enable ALL
logging level for org.apache.spark.scheduler.OutputCommitCoordinator
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
[source]¶
log4j.logger.org.apache.spark.scheduler.OutputCommitCoordinator=ALL¶
Refer to spark-logging.md[Logging].
== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| [[authorizedCommittersByStage]] authorizedCommittersByStage
| Tracks commit locks for task attempts for a partition in a stage.
Used in <
|===