Stage-Level Scheduling¶
Stage-Level Scheduling uses ResourceProfiles for the following:
- Spark developers can specify task and executor resource requirements at stage level
- Spark (Scheduler) uses the stage-level requirements to acquire the necessary resources and executors and schedule tasks based on the per-stage requirements
Apache Spark 3.1.1
Stage-Level Scheduling was introduced in Apache Spark 3.1.1 (cf. SPARK-27495)
Resource Profiles¶
Resource Profiles are managed by ResourceProfileManager.
The Default ResourceProfile is known by ID 0.
Custom Resource Profiles are ResourceProfiles with non-0 IDs. Custom Resource Profiles are only supported on YARN, Kubernetes and Spark Standalone.
ResourceProfiles are associated with an RDD using withResources operator.
Resource Requests¶
Executor¶
Executor Resource Requests are specified using executorResources of a ResourceProfile.
Executor Resource Requests can be the following built-in resources:
coresmemorymemoryOverheadpyspark.memoryoffHeap
Other (deployment environment-specific) executor resource requests can be defined as Custom Executor Resources.
Task¶
Default Task Resources are specified based on spark.task.cpus and spark.task.resource-prefixed configuration properties.
SparkListenerResourceProfileAdded¶
ResourceProfiles can be monitored using SparkListenerResourceProfileAdded.
Dynamic Allocation¶
Dynamic Allocation of Executors is not supported.
Demo¶
Describe Distributed Computation¶
Let's describe a distributed computation (using RDD API) over a 10-record dataset.
val rdd = sc.range(0, 9)
Describe Required Resources¶
Optional Step
This demo assumes to be executed in local deployment mode (that supports the default ResourceProfile only) and so the step is considered optional until a supported cluster manager is used.
import org.apache.spark.resource.ResourceProfileBuilder
val rpb = new ResourceProfileBuilder
val rp1 = rpb.build()
scala> println(rp1.toString)
Profile: id = 1, executor resources: , task resources:
Configure Default ResourceProfile¶
FIXME
Use spark.task.resource-prefixed properties per ResourceUtils.
Associate Required Resources to Distributed Computation¶
rdd.withResources(rp1)
scala> rdd.withResources(rp1)
org.apache.spark.SparkException: TaskResourceProfiles are only supported for Standalone cluster for now when dynamic allocation is disabled.
at org.apache.spark.resource.ResourceProfileManager.isSupported(ResourceProfileManager.scala:71)
at org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:126)
at org.apache.spark.rdd.RDD.withResources(RDD.scala:1802)
... 42 elided
SPARK-43912
Reported as SPARK-43912 Incorrect SparkException for Stage-Level Scheduling in local mode.
Until it is fixed, enable Dynamic Allocation.
$ ./bin/spark-shell -c spark.dynamicAllocation.enabled=true