Stage-Level Scheduling¶
Stage-Level Scheduling uses ResourceProfiles for the following:
- Spark developers can specify task and executor resource requirements at stage level
- Spark (Scheduler) uses the stage-level requirements to acquire the necessary resources and executors and schedule tasks based on the per-stage requirements
Apache Spark 3.1.1
Stage-Level Scheduling was introduced in Apache Spark 3.1.1 (cf. SPARK-27495)
Resource Profiles¶
Resource Profiles are managed by ResourceProfileManager.
The Default ResourceProfile is known by ID 0
.
Custom Resource Profiles are ResourceProfiles with non-0
IDs. Custom Resource Profiles are only supported on YARN, Kubernetes and Spark Standalone.
ResourceProfile
s are associated with an RDD
using withResources operator.
Resource Requests¶
Executor¶
Executor Resource Requests are specified using executorResources of a ResourceProfile
.
Executor Resource Requests can be the following built-in resources:
cores
memory
memoryOverhead
pyspark.memory
offHeap
Other (deployment environment-specific) executor resource requests can be defined as Custom Executor Resources.
Task¶
Default Task Resources are specified based on spark.task.cpus and spark.task.resource-prefixed configuration properties.
SparkListenerResourceProfileAdded¶
ResourceProfile
s can be monitored using SparkListenerResourceProfileAdded.
Dynamic Allocation¶
Dynamic Allocation of Executors is not supported.
Demo¶
Describe Distributed Computation¶
Let's describe a distributed computation (using RDD API) over a 10-record dataset.
val rdd = sc.range(0, 9)
Describe Required Resources¶
Optional Step
This demo assumes to be executed in local
deployment mode (that supports the default ResourceProfile only) and so the step is considered optional until a supported cluster manager is used.
import org.apache.spark.resource.ResourceProfileBuilder
val rpb = new ResourceProfileBuilder
val rp1 = rpb.build()
scala> println(rp1.toString)
Profile: id = 1, executor resources: , task resources:
Configure Default ResourceProfile¶
FIXME
Use spark.task.resource
-prefixed properties per ResourceUtils.
Associate Required Resources to Distributed Computation¶
rdd.withResources(rp1)
scala> rdd.withResources(rp1)
org.apache.spark.SparkException: TaskResourceProfiles are only supported for Standalone cluster for now when dynamic allocation is disabled.
at org.apache.spark.resource.ResourceProfileManager.isSupported(ResourceProfileManager.scala:71)
at org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:126)
at org.apache.spark.rdd.RDD.withResources(RDD.scala:1802)
... 42 elided
SPARK-43912
Reported as SPARK-43912 Incorrect SparkException for Stage-Level Scheduling in local mode.
Until it is fixed, enable Dynamic Allocation.
$ ./bin/spark-shell -c spark.dynamicAllocation.enabled=true