Skip to content

BarrierTaskContext — TaskContext for Barrier Tasks

BarrierTaskContext is a concrete TaskContext of the tasks in a Barrier Stage in Barrier Execution Mode.

Creating Instance

BarrierTaskContext takes the following to be created:

BarrierTaskContext is created when:

Barrier Coordinator RPC Endpoint

barrierCoordinator: RpcEndpointRef

BarrierTaskContext creates a RpcEndpointRef to Barrier Coordinator RPC Endpoint when created.

barrierCoordinator is used to handle barrier and allGather operators (through runBarrier).

allGather

allGather(
  message: String): Array[String]

allGather runBarrier with the given message and ALL_GATHER request method.

Public API and PySpark

allGather is part of a public API.

allGather is used in BasePythonRunner.WriterThread (PySpark) when requested to barrierAndServe.

barrier

barrier(): Unit

barrier runBarrier with no message and BARRIER request method.

Public API and PySpark

barrier is part of a public API.

barrier is used in BasePythonRunner.WriterThread (PySpark) when requested to barrierAndServe.

Global Sync

runBarrier(
  message: String,
  requestMethod: RequestMethod.Value): Array[String]

runBarrier prints out the following INFO message to the logs:

Task [taskAttemptId] from Stage [stageId](Attempt [stageAttemptNumber]) has entered the global sync, current barrier epoch is [barrierEpoch].

runBarrier prints out the following TRACE message to the logs:

Current callSite: [callSite]

runBarrier schedules a TimerTask (Java) to print out the following INFO message to the logs every minute:

Task [taskAttemptId] from Stage [stageId](Attempt [stageAttemptNumber]) waiting under the global sync since [startTime],
has been waiting for [duration] seconds,
current barrier epoch is [barrierEpoch].

runBarrier requests the Barrier Coordinator RPC Endpoint to send a RequestToSync one-off message and waits 365 days (!) for a response (a collection of responses from all the barrier tasks).

1 Year to Wait for Response from Barrier Coordinator

runBarrier uses 1 year to wait until the response arrives.

runBarrier checks every second if the response "bundle" arrived.

runBarrier increments the barrierEpoch.

runBarrier prints out the following INFO message to the logs:

Task [taskAttemptId] from Stage [stageId](Attempt [stageAttemptNumber]) finished global sync successfully,
waited for [duration] seconds,
current barrier epoch is [barrierEpoch].

In the end, runBarrier returns the response "bundle" (a collection of responses from all the barrier tasks).


In case of a SparkException, runBarrier prints out the following INFO message to the logs and reports (re-throws) the exception up (the call chain):

Task [taskAttemptId] from Stage [stageId](Attempt [stageAttemptNumber]) failed to perform global sync,
waited for [duration] seconds,
current barrier epoch is [barrierEpoch].

runBarrier is used when:

Logging

Enable ALL logging level for org.apache.spark.BarrierTaskContext logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.BarrierTaskContext.name = org.apache.spark.BarrierTaskContext
logger.BarrierTaskContext.level = all

Refer to Logging.