BarrierTaskContext — TaskContext for Barrier Tasks¶
BarrierTaskContext is a concrete TaskContext of the tasks in a Barrier Stage in Barrier Execution Mode.
Creating Instance¶
BarrierTaskContext takes the following to be created:
BarrierTaskContext is created when:
Barrier Coordinator RPC Endpoint¶
barrierCoordinator: RpcEndpointRef
BarrierTaskContext creates a RpcEndpointRef to Barrier Coordinator RPC Endpoint when created.
barrierCoordinator is used to handle barrier and allGather operators (through runBarrier).
allGather¶
allGather(
message: String): Array[String]
allGather runBarrier with the given message and ALL_GATHER request method.
Public API and PySpark
allGather is part of a public API.
allGather is used in BasePythonRunner.WriterThread (PySpark) when requested to barrierAndServe.
barrier¶
barrier(): Unit
barrier runBarrier with no message and BARRIER request method.
Public API and PySpark
barrier is part of a public API.
barrier is used in BasePythonRunner.WriterThread (PySpark) when requested to barrierAndServe.
Global Sync¶
runBarrier(
message: String,
requestMethod: RequestMethod.Value): Array[String]
runBarrier prints out the following INFO message to the logs:
Task [taskAttemptId] from Stage [stageId](Attempt [stageAttemptNumber]) has entered the global sync, current barrier epoch is [barrierEpoch].
runBarrier prints out the following TRACE message to the logs:
Current callSite: [callSite]
runBarrier schedules a TimerTask (Java) to print out the following INFO message to the logs every minute:
Task [taskAttemptId] from Stage [stageId](Attempt [stageAttemptNumber]) waiting under the global sync since [startTime],
has been waiting for [duration] seconds,
current barrier epoch is [barrierEpoch].
runBarrier requests the Barrier Coordinator RPC Endpoint to send a RequestToSync one-off message and waits 365 days (!) for a response (a collection of responses from all the barrier tasks).
1 Year to Wait for Response from Barrier Coordinator
runBarrier uses 1 year to wait until the response arrives.
runBarrier checks every second if the response "bundle" arrived.
runBarrier increments the barrierEpoch.
runBarrier prints out the following INFO message to the logs:
Task [taskAttemptId] from Stage [stageId](Attempt [stageAttemptNumber]) finished global sync successfully,
waited for [duration] seconds,
current barrier epoch is [barrierEpoch].
In the end, runBarrier returns the response "bundle" (a collection of responses from all the barrier tasks).
In case of a SparkException, runBarrier prints out the following INFO message to the logs and reports (re-throws) the exception up (the call chain):
Task [taskAttemptId] from Stage [stageId](Attempt [stageAttemptNumber]) failed to perform global sync,
waited for [duration] seconds,
current barrier epoch is [barrierEpoch].
runBarrier is used when:
Logging¶
Enable ALL logging level for org.apache.spark.BarrierTaskContext logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.BarrierTaskContext.name = org.apache.spark.BarrierTaskContext
logger.BarrierTaskContext.level = all
Refer to Logging.