Barrier Execution Mode¶
Barrier Execution Mode (Barrier Scheduling) introduces a strong requirement on Spark Scheduler to launch all tasks of a Barrier Stage at the same time or not at all (and consequently wait until required resources are available). Moreover, a failure of a single task of a barrier stage fails the whole stage (and so the other tasks).
Barrier Execution Mode allows for as many tasks to be executed concurrently as ResourceProfile permits (that is enforced upon scheduling a barrier job).
Barrier Execution Mode aims at making Distributed Deep Learning with Apache Spark easier (or even possible).
Rephrasing dmlc/xgboost, Barrier Execution Mode makes sure that:
-
All tasks of a barrier stage are all launched at once. If there is not enough task slots, the exception will be produced
-
Tasks either all succeed or fail. Upon a task failure Spark aborts all the other tasks (TaskScheduler will kill all other running tasks) and restarts the whole barrier stage
-
Spark makes no assumption that tasks don't talk to each other. Actually, it is the opposite. Spark provides BarrierTaskContext which facilitates tasks discovery (e.g., barrier, allGather)
-
Permits restarting a training from a known state (checkpoint) in case of a failure
From the Design doc: Barrier Execution Mode:
In Spark, a task in a stage doesn't depend on any other task in the same stage, and hence it can be scheduled independently.
That gives Spark a freedom to schedule tasks in as many task batches as needed. So, 5 tasks can be scheduled on 1 CPU core quite easily in 5 consecutive batches. That's unlike MPI (or non-MapReduce scheduling systems) that allows for greater flexibility and inter-task dependency.
Later in Design doc: Barrier Execution Mode:
In MPI, all workers start at the same time and pass messages around.
To embed this workload in Spark, we need to introduce a new scheduling model, tentatively named "barrier scheduling", which launches the tasks at the same time and provides users enough information and tooling to embed distributed DL training into a Spark pipeline.
Barrier RDD¶
Barrier RDD is a RDDBarrier.
Barrier Stage¶
Barrier Stage is a Stage with at least one Barrier RDD.
Abstractions¶
RDD.barrier Operator¶
Barrier Execution Mode is based on RDD.barrier operator to indicate that Spark Scheduler must launch the tasks together for the current stage (and mark the current stage as a barrier stage).
barrier(): RDDBarrier[T]
RDD.barrier
creates a RDDBarrier that comes with the barrier-aware mapPartitions transformation.
mapPartitions[S](
f: Iterator[T] => Iterator[S],
preservesPartitioning: Boolean = false): RDD[S]
Under the covers, RDDBarrier.mapPartitions
creates a MapPartitionsRDD like the regular RDD.mapPartitions
transformation but with isFromBarrier flag enabled.
Task
has a isBarrier flag that says whether this task belongs to a barrier stage (default:false
).
isFromBarrier Flag¶
An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from an RDDBarrier
.
ShuffledRDD has the isBarrier flag always disabled (false
).
MapPartitionsRDD is the only RDD that can have the isBarrier flag enabled.
RDDBarrier.mapPartitions is the only transformation that creates a MapPartitionsRDD with the isFromBarrier flag enabled.
Unsupported Spark Features¶
The following Spark features are not supported:
Demo¶
Enable ALL
logging level for org.apache.spark.BarrierTaskContext logger to see what happens inside.
val tasksNum = 3
val nums = sc.parallelize(seq = 0 until 9, numSlices = tasksNum)
assert(nums.getNumPartitions == tasksNum)
Print out the available partitions and the number of records within each (using Spark SQL for a human-friendlier output).
import org.apache.spark.TaskContext
nums
.mapPartitions { it => Iterator.single((TaskContext.get.partitionId, it.size)) }
.toDF("partitionId", "size")
.show
+-----------+----+
|partitionId|size|
+-----------+----+
| 0| 3|
| 1| 3|
| 2| 3|
+-----------+----+
Distributed Training¶
RDD.barrier creates a Barrier Stage (a RDDBarrier).
import org.apache.spark.rdd.RDDBarrier
assert(nums.barrier.isInstanceOf[RDDBarrier[_]])
Use RDD.mapPartitions transformation to access a BarrierTaskContext.
val barrierRdd = nums
.barrier
.mapPartitions { ns =>
import org.apache.spark.{BarrierTaskContext, TaskContext}
val ctx = TaskContext.get.asInstanceOf[BarrierTaskContext]
val tid = ctx.partitionId()
val port = 10000 + tid
val host = "localhost"
val message = s"A message from task $tid, e.g. $host:$port it listens at"
val allTaskMessages = ctx.allGather(message)
if (tid == 0) { // only Task 0 prints out status
println(">>> Got host:port's from the other tasks")
allTaskMessages.foreach(println)
}
if (tid == 0) { // only Task 0 prints out status
println(">>> Starting a distributed training at the nodes...")
}
ctx.barrier() // this is BarrierTaskContext.barrier (not RDD.barrier)
// which can be confusing
if (tid == 0) { // only Task 0 prints out status
println(">>> All tasks have finished")
}
// return a model after combining (model) pieces from the nodes
ns
}
Run a distributed computation (using RDD.count action).
barrierRdd.count()
There should be INFO and TRACE messages printed out to the console (given ALL
logging level for org.apache.spark.BarrierTaskContext logger).
[Executor task launch worker for task 1.0 in stage 5.0 (TID 13)] INFO org.apache.spark.BarrierTaskContext:60 - Task 13 from Stage 5(Attempt 0) has entered the global sync, current barrier epoch is 0.
...
[Executor task launch worker for task 1.0 in stage 5.0 (TID 13)] TRACE org.apache.spark.BarrierTaskContext:68 - Current callSite: CallSite($anonfun$runBarrier$2 at Logging.scala:68,org.apache.spark.BarrierTaskContext.$anonfun$runBarrier$2(BarrierTaskContext.scala:61)
...
[Executor task launch worker for task 1.0 in stage 5.0 (TID 13)] INFO org.apache.spark.BarrierTaskContext:60 - Task 13 from Stage 5(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 1.
...
Open up web UI and explore the execution plans.
Access MapPartitionsRDD¶
MapPartitionsRDD is a private[spark]
class so to access RDD.isBarrier
method requires to be in org.apache.spark
package.
Paste the following code in spark-shell / Scala REPL using :paste -raw
mode.
package org.apache.spark
object IsBarrier {
import org.apache.spark.rdd.RDD
implicit class BypassPrivateSpark[T](rdd: RDD[T]) {
def isBarrier = rdd.isBarrier
}
}
import org.apache.spark.IsBarrier._
assert(barrierRdd.isBarrier)
Examples¶
Something worth reviewing the source code and learn from it
SynapseML¶
SynapseML's LightGBM on Apache Spark can be configured to use Barrier Execution Mode in the following modules:
synapse.ml.lightgbm.LightGBMClassifier
synapse.ml.lightgbm.LightGBMRanker
synapse.ml.lightgbm.LightGBMRegressor
XGBoost4J¶
XGBoost4J is the JVM package of xgboost (an optimized distributed gradient boosting library with machine learning algorithms for regression and classification under the Gradient Boosting framework).
The heart of distributed training in xgboost4j-spark (that can run distributed xgboost on Apache Spark) is XGBoost.trainDistributed.
There's a familiar line that creates a barrier stage (using RDD.barrier()
):
val boostersAndMetrics = trainingRDD.barrier().mapPartitions {
// distributed training using XGBoost happens here
}
The barrier mapPartitions
block finishes is followed by RDD.collect()
that gets XGBoost4J-specific metadata (booster
and metrics
):
val (booster, metrics) = boostersAndMetrics.collect()(0)
Within the barrier stage (within mapPartitions
block), xgboost4j-spark builds a distributed booster:
- Checkpointing, when enabled, happens only by Task 0
- All tasks initialize so-called collective Communicator for synchronization
- xgboost4j-spark uses XGBoostJNI to talk to XGBoost using JNI
- Only Task 0 returns non-empty iterator (and that's why the
RDD.collect()(0)
gets(booster, metrics)
) - All tasks execute SXGBoost.train that eventually leads to XGBoost.trainAndSaveCheckpoint