ContextBarrierState¶
ContextBarrierState represents the state of global sync of a barrier stage (with the number of tasks).
ContextBarrierState is used by BarrierCoordinator to handle RequestToSync messages (and to keep track of active barrier stage attempts).
ContextBarrierState
ContextBarrierState is a private class of BarrierCoordinator.
Creating Instance¶
ContextBarrierState takes the following to be created:
- ContextBarrierId
- Number of Tasks (of a barrier stage)
ContextBarrierState is created when:
BarrierCoordinatoris requested to handle a RequestToSync message for a new stage and stage attempt IDs
Barrier Stage Attempt (ContextBarrierId)¶
ContextBarrierState is given a ContextBarrierId (of a barrier stage) when created.
The ContextBarrierId uniquely identifies a barrier stage by the stage and stage attempt IDs.
Barrier Epoch¶
ContextBarrierState initializes barrierEpoch counter to be 0 when created.
Barrier Tasks¶
requesters: ArrayBuffer[RpcCallContext]
requesters is a registry of RpcCallContexts of the barrier tasks (of a barrier stage attempt) pending a reply.
It is only when the number of RpcCallContexts in the requesters reaches the number of tasks expected (while handling RequestToSync requests) that this ContextBarrierState is considered finished successfully.
ContextBarrierState initializes requesters when created to be of number of tasks size.
A new RpcCallContext of a barrier task is added in handleRequest only when the epoch of the barrier task matches the current barrierEpoch.
TimerTask¶
timerTask: TimerTask
ContextBarrierState uses a TimerTask (Java) to ensure that a barrier() call can time out.
ContextBarrierState creates a TimerTask (Java) when requested to initTimerTask when requested to handle a RequestToSync message for the first global sync message received (when the requesters is empty). The TimerTask is then immediately scheduled to be executed after spark.barrier.sync.timeout.
spark.barrier.sync.timeout
Since spark.barrier.sync.timeout defaults to 365d (1 year), the TimerTask will run only after one year.
The TimerTask is stopped in cancelTimerTask.
Initializing TimerTask¶
initTimerTask(
state: ContextBarrierState): Unit
initTimerTask creates a new TimerTask (Java) that, when executed, sends a SparkException to all the requesters with the following message followed by cleanupBarrierStage for this ContextBarrierId.
The coordinator didn't get all barrier sync requests
for barrier epoch [barrierEpoch] from [barrierId] within [timeoutInSecs] second(s).
The TimerTask is made available as timerTask.
initTimerTask is used when:
ContextBarrierStateis requested to handle a RequestToSync message (for the first global sync message received when the requesters is empty)
messages¶
ContextBarrierState initializes messages registry of messages from all numTasks barrier tasks (of a barrier stage attempt) when created.
messages registry is empty.
A new message is registered (added) when handling a RequestToSync request.
Handling RequestToSync Message¶
handleRequest(
requester: RpcCallContext,
request: RequestToSync): Unit
handleRequest makes sure that the RequestMethod (of the given RequestToSync) is consistent across barrier tasks (using requestMethods registry).
handleRequest asserts that the number of tasks is this numTasks, and so consistent across barrier tasks. Otherwise, handleRequest reports IllegalArgumentException:
Number of tasks of [barrierId] is [numTasks] from Task [taskId], previously it was [numTasks].
handleRequest prints out the following INFO message to the logs (with the ContextBarrierId and barrierEpoch):
Current barrier epoch for [barrierId] is [barrierEpoch].
For the first sync message received (requesters is empty), handleRequest initializes the TimerTask and schedules it for execution after the timeoutInSecs.
Timeout
Starting the timerTask ensures that a sync may eventually time out (after a configured delay).
handleRequest registers the given requester in the requesters.
handleRequest registers the message of the RequestToSync in the messages for the partitionId.
handleRequest prints out the following INFO message to the logs:
Barrier sync epoch [barrierEpoch] from [barrierId] received update from Task taskId,
current progress: [requesters]/[numTasks].
Updates from All Barrier Tasks Received¶
When the barrier sync received updates from all barrier tasks (i.e., the number of requesters is the numTasks), handleRequest replies back to all the requesters with the messages.
handleRequest prints out the following INFO message to the logs:
Barrier sync epoch [barrierEpoch] from [barrierId] received all updates from tasks,
finished successfully.
handleRequest increments the barrierEpoch, clears the requesters and the requestMethods, and then cancelTimerTask.
In case of the epoch of the given RequestToSync being different from this barrierEpoch, handleRequest sends back a failure message (with a SparkException) to the given requester:
The request to sync of [barrierId] with barrier epoch [barrierEpoch] has already finished.
Maybe task [taskId] is not properly killed.
In case of different RequestMethods (in requestMethods registry), handleRequest sends back a failure message to the requesters (incl. the given requester):
Different barrier sync types found for the sync [barrierId]: [requestMethods].
Please use the same barrier sync type within a single sync.
handleRequest clear.
handleRequest is used when:
BarrierCoordinatoris requested to handle a RequestToSync message
Logging¶
ContextBarrierState is a private class of BarrierCoordinator and logging is configured using the logger of BarrierCoordinator.