Skip to content

Barrier Execution Mode

Barrier Execution Mode (Barrier Scheduling) introduces a strong requirement on Spark Scheduler to launch all tasks of a Barrier Stage at the same time or not at all (and consequently wait until required resources are available). Moreover, a failure of a single task of a barrier stage fails the whole stage (and so the other tasks).

Barrier Execution Mode allows for as many tasks to be executed concurrently as ResourceProfile permits (that is enforced upon scheduling a barrier job).

Barrier Execution Mode aims at making Distributed Deep Learning with Apache Spark easier (or even possible).

Rephrasing dmlc/xgboost, Barrier Execution Mode makes sure that:

  1. All tasks of a barrier stage are all launched at once. If there is not enough task slots, the exception will be produced

  2. Tasks either all succeed or fail. Upon a task failure Spark aborts all the other tasks (TaskScheduler will kill all other running tasks) and restarts the whole barrier stage

  3. Spark makes no assumption that tasks don't talk to each other. Actually, it is the opposite. Spark provides BarrierTaskContext which facilitates tasks discovery (e.g., barrier, allGather)

  4. Permits restarting a training from a known state (checkpoint) in case of a failure

From the Design doc: Barrier Execution Mode:

In Spark, a task in a stage doesn't depend on any other task in the same stage, and hence it can be scheduled independently.

That gives Spark a freedom to schedule tasks in as many task batches as needed. So, 5 tasks can be scheduled on 1 CPU core quite easily in 5 consecutive batches. That's unlike MPI (or non-MapReduce scheduling systems) that allows for greater flexibility and inter-task dependency.

Later in Design doc: Barrier Execution Mode:

In MPI, all workers start at the same time and pass messages around.

To embed this workload in Spark, we need to introduce a new scheduling model, tentatively named "barrier scheduling", which launches the tasks at the same time and provides users enough information and tooling to embed distributed DL training into a Spark pipeline.

Barrier RDD

Barrier RDD is a RDDBarrier.

Barrier Stage

Barrier Stage is a Stage with at least one Barrier RDD.


RDD.barrier Operator

Barrier Execution Mode is based on RDD.barrier operator to indicate that Spark Scheduler must launch the tasks together for the current stage (and mark the current stage as a barrier stage).

barrier(): RDDBarrier[T]

RDD.barrier creates a RDDBarrier that comes with the barrier-aware mapPartitions transformation.

  f: Iterator[T] => Iterator[S],
  preservesPartitioning: Boolean = false): RDD[S]

Under the covers, RDDBarrier.mapPartitions creates a MapPartitionsRDD like the regular RDD.mapPartitions transformation but with isFromBarrier flag enabled.

  • Task has a isBarrier flag that says whether this task belongs to a barrier stage (default: false).

isFromBarrier Flag

An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from an RDDBarrier.

ShuffledRDD has the isBarrier flag always disabled (false).

MapPartitionsRDD is the only RDD that can have the isBarrier flag enabled.

RDDBarrier.mapPartitions is the only transformation that creates a MapPartitionsRDD with the isFromBarrier flag enabled.

Unsupported Spark Features

The following Spark features are not supported:


Enable ALL logging level for org.apache.spark.BarrierTaskContext logger to see what happens inside.

val tasksNum = 3
val nums = sc.parallelize(seq = 0 until 9, numSlices = tasksNum)
assert(nums.getNumPartitions == tasksNum)

Print out the available partitions and the number of records within each (using Spark SQL for a human-friendlier output).

import org.apache.spark.TaskContext
  .mapPartitions { it => Iterator.single((TaskContext.get.partitionId, it.size)) }
  .toDF("partitionId", "size")
|          0|   3|
|          1|   3|
|          2|   3|

RDD.barrier creates a Barrier Stage (a RDDBarrier).

import org.apache.spark.rdd.RDDBarrier

Use RDD.mapPartitions transformation to access a BarrierTaskContext.

val barrierRdd = nums
  .mapPartitions { ns =>
    import org.apache.spark.{BarrierTaskContext, TaskContext}
    val ctx = TaskContext.get.asInstanceOf[BarrierTaskContext]
    ctx.allGather("waiting for all the barrier tasks")

Run a distributed computation (using RDD.count action).


There should be INFO and TRACE messages printed out to the console (given ALL logging level for org.apache.spark.BarrierTaskContext logger).

[Executor task launch worker for task 1.0 in stage 5.0 (TID 13)] INFO  org.apache.spark.BarrierTaskContext:60 - Task 13 from Stage 5(Attempt 0) has entered the global sync, current barrier epoch is 0.
[Executor task launch worker for task 1.0 in stage 5.0 (TID 13)] TRACE org.apache.spark.BarrierTaskContext:68 - Current callSite: CallSite($anonfun$runBarrier$2 at Logging.scala:68,org.apache.spark.BarrierTaskContext.$anonfun$runBarrier$2(BarrierTaskContext.scala:61)
[Executor task launch worker for task 1.0 in stage 5.0 (TID 13)] INFO  org.apache.spark.BarrierTaskContext:60 - Task 13 from Stage 5(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 1.

Open up web UI and explore the execution plans.

Access MapPartitionsRDD

MapPartitionsRDD is a private[spark] class so to access RDD.isBarrier method requires to be in org.apache.spark package.

Paste the following code in spark-shell / Scala REPL using :paste -raw mode.

package org.apache.spark

object IsBarrier {
  import org.apache.spark.rdd.RDD
  implicit class BypassPrivateSpark[T](rdd: RDD[T]) {
    def isBarrier = rdd.isBarrier
import org.apache.spark.IsBarrier._


Something worth reviewing the source code and learn from it


SynapseML's LightGBM on Apache Spark can be configured to use Barrier Execution Mode in the following modules:


Learn More

  1. SPIP: Support Barrier Execution Mode in Apache Spark (esp. Design: Barrier execution mode)
  2. Barrier Execution Mode in Spark 3.0 - Part 1 : Introduction