ContextBarrierState¶
ContextBarrierState
represents the state of global sync of a barrier stage (with the number of tasks).
ContextBarrierState
is used by BarrierCoordinator to handle RequestToSync messages (and to keep track of active barrier stage attempts).
ContextBarrierState
ContextBarrierState
is a private class
of BarrierCoordinator.
Creating Instance¶
ContextBarrierState
takes the following to be created:
- ContextBarrierId
- Number of Tasks (of a barrier stage)
ContextBarrierState
is created when:
BarrierCoordinator
is requested to handle a RequestToSync message for a new stage and stage attempt IDs
Barrier Stage Attempt (ContextBarrierId)¶
ContextBarrierState
is given a ContextBarrierId
(of a barrier stage) when created.
The ContextBarrierId
uniquely identifies a barrier stage by the stage and stage attempt IDs.
Barrier Epoch¶
ContextBarrierState
initializes barrierEpoch
counter to be 0
when created.
Barrier Tasks¶
requesters: ArrayBuffer[RpcCallContext]
requesters
is a registry of RpcCallContext
s of the barrier tasks (of a barrier stage attempt) pending a reply.
It is only when the number of RpcCallContext
s in the requesters
reaches the number of tasks expected (while handling RequestToSync requests) that this ContextBarrierState
is considered finished successfully.
ContextBarrierState
initializes requesters
when created to be of number of tasks size.
A new RpcCallContext
of a barrier task is added in handleRequest only when the epoch of the barrier task matches the current barrierEpoch.
TimerTask¶
timerTask: TimerTask
ContextBarrierState
uses a TimerTask
(Java) to ensure that a barrier()
call can time out.
ContextBarrierState
creates a TimerTask
(Java) when requested to initTimerTask when requested to handle a RequestToSync message for the first global sync message received (when the requesters is empty). The TimerTask
is then immediately scheduled to be executed after spark.barrier.sync.timeout.
spark.barrier.sync.timeout
Since spark.barrier.sync.timeout defaults to 365d
(1 year), the TimerTask
will run only after one year.
The TimerTask
is stopped in cancelTimerTask.
Initializing TimerTask¶
initTimerTask(
state: ContextBarrierState): Unit
initTimerTask
creates a new TimerTask
(Java) that, when executed, sends a SparkException
to all the requesters with the following message followed by cleanupBarrierStage for this ContextBarrierId.
The coordinator didn't get all barrier sync requests
for barrier epoch [barrierEpoch] from [barrierId] within [timeoutInSecs] second(s).
The TimerTask
is made available as timerTask.
initTimerTask
is used when:
ContextBarrierState
is requested to handle a RequestToSync message (for the first global sync message received when the requesters is empty)
messages¶
ContextBarrierState
initializes messages
registry of messages from all numTasks barrier tasks (of a barrier stage attempt) when created.
messages
registry is empty.
A new message is registered (added) when handling a RequestToSync request.
Handling RequestToSync Message¶
handleRequest(
requester: RpcCallContext,
request: RequestToSync): Unit
handleRequest
makes sure that the RequestMethod (of the given RequestToSync) is consistent across barrier tasks (using requestMethods registry).
handleRequest
asserts that the number of tasks is this numTasks, and so consistent across barrier tasks. Otherwise, handleRequest
reports IllegalArgumentException
:
Number of tasks of [barrierId] is [numTasks] from Task [taskId], previously it was [numTasks].
handleRequest
prints out the following INFO message to the logs (with the ContextBarrierId and barrierEpoch):
Current barrier epoch for [barrierId] is [barrierEpoch].
For the first sync message received (requesters is empty), handleRequest
initializes the TimerTask and schedules it for execution after the timeoutInSecs.
Timeout
Starting the timerTask ensures that a sync may eventually time out (after a configured delay).
handleRequest
registers the given requester
in the requesters.
handleRequest
registers the message of the RequestToSync in the messages for the partitionId.
handleRequest
prints out the following INFO message to the logs:
Barrier sync epoch [barrierEpoch] from [barrierId] received update from Task taskId,
current progress: [requesters]/[numTasks].
Updates from All Barrier Tasks Received¶
When the barrier sync received updates from all barrier tasks (i.e., the number of requesters is the numTasks), handleRequest
replies back to all the requesters with the messages.
handleRequest
prints out the following INFO message to the logs:
Barrier sync epoch [barrierEpoch] from [barrierId] received all updates from tasks,
finished successfully.
handleRequest
increments the barrierEpoch, clears the requesters and the requestMethods, and then cancelTimerTask.
In case of the epoch of the given RequestToSync being different from this barrierEpoch, handleRequest
sends back a failure message (with a SparkException
) to the given requester
:
The request to sync of [barrierId] with barrier epoch [barrierEpoch] has already finished.
Maybe task [taskId] is not properly killed.
In case of different RequestMethods (in requestMethods registry), handleRequest
sends back a failure message to the requesters (incl. the given requester
):
Different barrier sync types found for the sync [barrierId]: [requestMethods].
Please use the same barrier sync type within a single sync.
handleRequest
clear.
handleRequest
is used when:
BarrierCoordinator
is requested to handle a RequestToSync message
Logging¶
ContextBarrierState
is a private class of BarrierCoordinator and logging is configured using the logger of BarrierCoordinator.