Skip to content

Barrier Coordinator RPC Endpoint

BarrierCoordinator is a ThreadSafeRpcEndpoint that is registered as barrierSync RPC Endpoint when TaskSchedulerImpl is requested to maybeInitBarrierCoordinator.

BarrierCoordinator is responsible for handling RequestToSync messages to coordinate Global Syncs of barrier tasks (using allGather and barrier operators).

In other words, the driver sets up a BarrierCoordinator (TaskSchedulerImpl precisely) upon startup that BarrierTaskContexts talk to using RequestToSync messages. BarrierCoordinator tracks the number of tasks to wait for until a barrier stage is complete and a response can be sent back to the tasks to continue (that are paused for 365 days (!)).

Creating Instance

BarrierCoordinator takes the following to be created:

BarrierCoordinator is created when:

Processing RequestToSync Messages (from Barrier Tasks)

RpcEndpoint
receiveAndReply(
  context: RpcCallContext): PartialFunction[Any, Unit]

receiveAndReply is part of the RpcEndpoint abstraction.

receiveAndReply handles RequestToSync messages.


Unless already registered, receiveAndReply registers a new ContextBarrierId (for the stageId and the stageAttemptId) in the Barrier States registry.

Multiple Tasks and One BarrierCoordinator

receiveAndReply handles RequestToSync messages, one per task in a barrier stage. Out of all the properties of RequestToSync, numTasks, stageId and stageAttemptId are used.

The very first RequestToSync is used to register the stageId and stageAttemptId (as ContextBarrierId) with numTasks.

receiveAndReply finds the ContextBarrierState for the stage and the stage attempt (in the Barrier States registry) to handle the RequestToSync.

Barrier States

states: ConcurrentHashMap[ContextBarrierId, ContextBarrierState]

BarrierCoordinator creates an empty ConcurrentHashMap (Java) when created.

states registry is used to keep track of all the active barrier stage attempts and the corresponding internal ContextBarrierState.

states is used when:

SparkListener

BarrierCoordinator creates a SparkListener when created.

The SparkListener is used to intercept SparkListenerStageCompleted events.

The SparkListener is addToStatusQueue upon startup and removed at stop.

onStageCompleted

SparkListener
onStageCompleted(
  stageCompleted: SparkListenerStageCompleted): Unit

onStageCompleted is part of the SparkListenerInterface abstraction.

onStageCompleted cleanupBarrierStage for the stage and the attempt number (based on the given SparkListenerStageCompleted).

Logging

Enable ALL logging level for org.apache.spark.BarrierCoordinator logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.BarrierCoordinator.name = org.apache.spark.BarrierCoordinator
logger.BarrierCoordinator.level = all

Refer to Logging.