BarrierTaskContext — TaskContext for Barrier Tasks¶
BarrierTaskContext
is a concrete TaskContext of the tasks in a Barrier Stage in Barrier Execution Mode.
Creating Instance¶
BarrierTaskContext
takes the following to be created:
BarrierTaskContext
is created when:
Barrier Coordinator RPC Endpoint¶
barrierCoordinator: RpcEndpointRef
BarrierTaskContext
creates a RpcEndpointRef to Barrier Coordinator RPC Endpoint when created.
barrierCoordinator
is used to handle barrier and allGather operators (through runBarrier).
allGather¶
allGather(
message: String): Array[String]
allGather
runBarrier with the given message
and ALL_GATHER
request method.
Public API and PySpark
allGather
is part of a public API.
allGather
is used in BasePythonRunner.WriterThread
(PySpark) when requested to barrierAndServe
.
barrier¶
barrier(): Unit
barrier
runBarrier with no message and BARRIER
request method.
Public API and PySpark
barrier
is part of a public API.
barrier
is used in BasePythonRunner.WriterThread
(PySpark) when requested to barrierAndServe
.
Global Sync¶
runBarrier(
message: String,
requestMethod: RequestMethod.Value): Array[String]
runBarrier
prints out the following INFO message to the logs:
Task [taskAttemptId] from Stage [stageId](Attempt [stageAttemptNumber]) has entered the global sync, current barrier epoch is [barrierEpoch].
runBarrier
prints out the following TRACE message to the logs:
Current callSite: [callSite]
runBarrier
schedules a TimerTask
(Java) to print out the following INFO message to the logs every minute:
Task [taskAttemptId] from Stage [stageId](Attempt [stageAttemptNumber]) waiting under the global sync since [startTime],
has been waiting for [duration] seconds,
current barrier epoch is [barrierEpoch].
runBarrier
requests the Barrier Coordinator RPC Endpoint to send a RequestToSync one-off message and waits 365 days (!) for a response (a collection of responses from all the barrier tasks).
1 Year to Wait for Response from Barrier Coordinator
runBarrier
uses 1 year to wait until the response arrives.
runBarrier
checks every second if the response "bundle" arrived.
runBarrier
increments the barrierEpoch.
runBarrier
prints out the following INFO message to the logs:
Task [taskAttemptId] from Stage [stageId](Attempt [stageAttemptNumber]) finished global sync successfully,
waited for [duration] seconds,
current barrier epoch is [barrierEpoch].
In the end, runBarrier
returns the response "bundle" (a collection of responses from all the barrier tasks).
In case of a SparkException
, runBarrier
prints out the following INFO message to the logs and reports (re-throws) the exception up (the call chain):
Task [taskAttemptId] from Stage [stageId](Attempt [stageAttemptNumber]) failed to perform global sync,
waited for [duration] seconds,
current barrier epoch is [barrierEpoch].
runBarrier
is used when:
Logging¶
Enable ALL
logging level for org.apache.spark.BarrierTaskContext
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.BarrierTaskContext.name = org.apache.spark.BarrierTaskContext
logger.BarrierTaskContext.level = all
Refer to Logging.