Skip to content

ContextBarrierState

ContextBarrierState represents the state of global sync of a barrier stage (with the number of tasks).

ContextBarrierState is used by BarrierCoordinator to handle RequestToSync messages (and to keep track of active barrier stage attempts).

ContextBarrierState

ContextBarrierState is a private class of BarrierCoordinator.

Creating Instance

ContextBarrierState takes the following to be created:

ContextBarrierState is created when:

Barrier Stage Attempt (ContextBarrierId)

ContextBarrierState is given a ContextBarrierId (of a barrier stage) when created.

The ContextBarrierId uniquely identifies a barrier stage by the stage and stage attempt IDs.

Barrier Epoch

ContextBarrierState initializes barrierEpoch counter to be 0 when created.

Barrier Tasks

requesters: ArrayBuffer[RpcCallContext]

requesters is a registry of RpcCallContexts of the barrier tasks (of a barrier stage attempt) pending a reply.

It is only when the number of RpcCallContexts in the requesters reaches the number of tasks expected (while handling RequestToSync requests) that this ContextBarrierState is considered finished successfully.

ContextBarrierState initializes requesters when created to be of number of tasks size.

A new RpcCallContext of a barrier task is added in handleRequest only when the epoch of the barrier task matches the current barrierEpoch.

TimerTask

timerTask: TimerTask

ContextBarrierState uses a TimerTask (Java) to ensure that a barrier() call can time out.

ContextBarrierState creates a TimerTask (Java) when requested to initTimerTask when requested to handle a RequestToSync message for the first global sync message received (when the requesters is empty). The TimerTask is then immediately scheduled to be executed after spark.barrier.sync.timeout.

spark.barrier.sync.timeout

Since spark.barrier.sync.timeout defaults to 365d (1 year), the TimerTask will run only after one year.

The TimerTask is stopped in cancelTimerTask.

Initializing TimerTask

initTimerTask(
  state: ContextBarrierState): Unit

initTimerTask creates a new TimerTask (Java) that, when executed, sends a SparkException to all the requesters with the following message followed by cleanupBarrierStage for this ContextBarrierId.

The coordinator didn't get all barrier sync requests
for barrier epoch [barrierEpoch] from [barrierId] within [timeoutInSecs] second(s).

The TimerTask is made available as timerTask.


initTimerTask is used when:

messages

ContextBarrierState initializes messages registry of messages from all numTasks barrier tasks (of a barrier stage attempt) when created.

messages registry is empty.

A new message is registered (added) when handling a RequestToSync request.

Handling RequestToSync Message

handleRequest(
  requester: RpcCallContext,
  request: RequestToSync): Unit

handleRequest makes sure that the RequestMethod (of the given RequestToSync) is consistent across barrier tasks (using requestMethods registry).

handleRequest asserts that the number of tasks is this numTasks, and so consistent across barrier tasks. Otherwise, handleRequest reports IllegalArgumentException:

Number of tasks of [barrierId] is [numTasks] from Task [taskId], previously it was [numTasks].

handleRequest prints out the following INFO message to the logs (with the ContextBarrierId and barrierEpoch):

Current barrier epoch for [barrierId] is [barrierEpoch].

For the first sync message received (requesters is empty), handleRequest initializes the TimerTask and schedules it for execution after the timeoutInSecs.

Timeout

Starting the timerTask ensures that a sync may eventually time out (after a configured delay).

handleRequest registers the given requester in the requesters.

handleRequest registers the message of the RequestToSync in the messages for the partitionId.

handleRequest prints out the following INFO message to the logs:

Barrier sync epoch [barrierEpoch] from [barrierId] received update from Task taskId,
current progress: [requesters]/[numTasks].

Updates from All Barrier Tasks Received

When the barrier sync received updates from all barrier tasks (i.e., the number of requesters is the numTasks), handleRequest replies back to all the requesters with the messages.

handleRequest prints out the following INFO message to the logs:

Barrier sync epoch [barrierEpoch] from [barrierId] received all updates from tasks,
finished successfully.

handleRequest increments the barrierEpoch, clears the requesters and the requestMethods, and then cancelTimerTask.


In case of the epoch of the given RequestToSync being different from this barrierEpoch, handleRequest sends back a failure message (with a SparkException) to the given requester:

The request to sync of [barrierId] with barrier epoch [barrierEpoch] has already finished.
Maybe task [taskId] is not properly killed.

In case of different RequestMethods (in requestMethods registry), handleRequest sends back a failure message to the requesters (incl. the given requester):

Different barrier sync types found for the sync [barrierId]: [requestMethods].
Please use the same barrier sync type within a single sync.

handleRequest clear.


handleRequest is used when:

Logging

ContextBarrierState is a private class of BarrierCoordinator and logging is configured using the logger of BarrierCoordinator.