Barrier Coordinator RPC Endpoint¶
BarrierCoordinator
is a ThreadSafeRpcEndpoint that is registered as barrierSync RPC Endpoint when TaskSchedulerImpl
is requested to maybeInitBarrierCoordinator.
BarrierCoordinator
is responsible for handling RequestToSync messages to coordinate Global Syncs of barrier tasks (using allGather and barrier operators).
In other words, the driver sets up a BarrierCoordinator
(TaskSchedulerImpl precisely) upon startup that BarrierTaskContexts talk to using RequestToSync messages. BarrierCoordinator
tracks the number of tasks to wait for until a barrier stage is complete and a response can be sent back to the tasks to continue (that are paused for 365 days (!)).
Creating Instance¶
BarrierCoordinator
takes the following to be created:
- Timeout (seconds)
- LiveListenerBus
- RpcEnv
BarrierCoordinator
is created when:
TaskSchedulerImpl
is requested to maybeInitBarrierCoordinator
Processing RequestToSync Messages (from Barrier Tasks)¶
RpcEndpoint
receiveAndReply(
context: RpcCallContext): PartialFunction[Any, Unit]
receiveAndReply
is part of the RpcEndpoint abstraction.
receiveAndReply
handles RequestToSync messages.
Unless already registered, receiveAndReply
registers a new ContextBarrierId
(for the stageId and the stageAttemptId) in the Barrier States registry.
Multiple Tasks and One BarrierCoordinator
receiveAndReply
handles RequestToSync messages, one per task in a barrier stage. Out of all the properties of RequestToSync
, numTasks, stageId and stageAttemptId are used.
The very first RequestToSync
is used to register the stageId and stageAttemptId (as ContextBarrierId
) with numTasks.
receiveAndReply
finds the ContextBarrierState for the stage and the stage attempt (in the Barrier States registry) to handle the RequestToSync.
Barrier States¶
states: ConcurrentHashMap[ContextBarrierId, ContextBarrierState]
BarrierCoordinator
creates an empty ConcurrentHashMap
(Java) when created.
states
registry is used to keep track of all the active barrier stage attempts and the corresponding internal ContextBarrierState.
states
is used when:
- onStop to clean up
- cleanupBarrierStage to remove a specific stage attempt
- receiveAndReply to handle RequestToSync messages
SparkListener¶
BarrierCoordinator
creates a SparkListener when created.
The SparkListener
is used to intercept SparkListenerStageCompleted events.
The SparkListener
is addToStatusQueue upon startup and removed at stop.
onStageCompleted¶
SparkListener
onStageCompleted(
stageCompleted: SparkListenerStageCompleted): Unit
onStageCompleted
is part of the SparkListenerInterface abstraction.
onStageCompleted
cleanupBarrierStage for the stage and the attempt number (based on the given SparkListenerStageCompleted
).
Logging¶
Enable ALL
logging level for org.apache.spark.BarrierCoordinator
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.BarrierCoordinator.name = org.apache.spark.BarrierCoordinator
logger.BarrierCoordinator.level = all
Refer to Logging.