Barrier Coordinator RPC Endpoint¶
BarrierCoordinator is a ThreadSafeRpcEndpoint that is registered as barrierSync RPC Endpoint when TaskSchedulerImpl is requested to maybeInitBarrierCoordinator.
BarrierCoordinator is responsible for handling RequestToSync messages to coordinate Global Syncs of barrier tasks (using allGather and barrier operators).
In other words, the driver sets up a BarrierCoordinator (TaskSchedulerImpl precisely) upon startup that BarrierTaskContexts talk to using RequestToSync messages. BarrierCoordinator tracks the number of tasks to wait for until a barrier stage is complete and a response can be sent back to the tasks to continue (that are paused for 365 days (!)).
Creating Instance¶
BarrierCoordinator takes the following to be created:
- Timeout (seconds)
- LiveListenerBus
- RpcEnv
BarrierCoordinator is created when:
TaskSchedulerImplis requested to maybeInitBarrierCoordinator
Processing RequestToSync Messages (from Barrier Tasks)¶
RpcEndpoint
receiveAndReply(
context: RpcCallContext): PartialFunction[Any, Unit]
receiveAndReply is part of the RpcEndpoint abstraction.
receiveAndReply handles RequestToSync messages.
Unless already registered, receiveAndReply registers a new ContextBarrierId (for the stageId and the stageAttemptId) in the Barrier States registry.
Multiple Tasks and One BarrierCoordinator
receiveAndReply handles RequestToSync messages, one per task in a barrier stage. Out of all the properties of RequestToSync, numTasks, stageId and stageAttemptId are used.
The very first RequestToSync is used to register the stageId and stageAttemptId (as ContextBarrierId) with numTasks.
receiveAndReply finds the ContextBarrierState for the stage and the stage attempt (in the Barrier States registry) to handle the RequestToSync.
Barrier States¶
states: ConcurrentHashMap[ContextBarrierId, ContextBarrierState]
BarrierCoordinator creates an empty ConcurrentHashMap (Java) when created.
states registry is used to keep track of all the active barrier stage attempts and the corresponding internal ContextBarrierState.
states is used when:
- onStop to clean up
- cleanupBarrierStage to remove a specific stage attempt
- receiveAndReply to handle RequestToSync messages
SparkListener¶
BarrierCoordinator creates a SparkListener when created.
The SparkListener is used to intercept SparkListenerStageCompleted events.
The SparkListener is addToStatusQueue upon startup and removed at stop.
onStageCompleted¶
SparkListener
onStageCompleted(
stageCompleted: SparkListenerStageCompleted): Unit
onStageCompleted is part of the SparkListenerInterface abstraction.
onStageCompleted cleanupBarrierStage for the stage and the attempt number (based on the given SparkListenerStageCompleted).
Logging¶
Enable ALL logging level for org.apache.spark.BarrierCoordinator logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.BarrierCoordinator.name = org.apache.spark.BarrierCoordinator
logger.BarrierCoordinator.level = all
Refer to Logging.