Stage-Level Scheduling uses ResourceProfiles for the following:
- Spark developers can specify task and executor resource requirements at stage level
- Spark (Scheduler) uses the stage-level requirements to acquire the necessary resources and executors and schedule tasks based on the per-stage requirements
Apache Spark 3.1.1
Stage-Level Scheduling was introduced in Apache Spark 3.1.1 (cf. SPARK-27495)
Resource Profiles are managed by ResourceProfileManager.
The Default ResourceProfile is known by ID
Custom Resource Profiles are ResourceProfiles with non-
0 IDs. Custom Resource Profiles are only supported on YARN, Kubernetes and Spark Standalone.
ResourceProfiles are associated with an
RDD using withResources operator.
Executor Resource Requests are specified using executorResources of a
Executor Resource Requests can be the following built-in resources:
Other (deployment environment-specific) executor resource requests can be defined as Custom Executor Resources.
Default Task Resources are specified based on spark.task.cpus and spark.task.resource-prefixed configuration properties.
ResourceProfiles can be monitored using SparkListenerResourceProfileAdded.
Dynamic Allocation of Executors is not supported.
Describe Distributed Computation¶
Let's describe a distributed computation (using RDD API) over a 10-record dataset.
val rdd = sc.range(0, 9)
Describe Required Resources¶
This demo assumes to be executed in
local deployment mode (that supports the default ResourceProfile only) and so the step is considered optional until a supported cluster manager is used.
import org.apache.spark.resource.ResourceProfileBuilder val rpb = new ResourceProfileBuilder val rp1 = rpb.build()
scala> println(rp1.toString) Profile: id = 1, executor resources: , task resources:
Configure Default ResourceProfile¶
spark.task.resource-prefixed properties per ResourceUtils.
Associate Required Resources to Distributed Computation¶
scala> rdd.withResources(rp1) org.apache.spark.SparkException: TaskResourceProfiles are only supported for Standalone cluster for now when dynamic allocation is disabled. at org.apache.spark.resource.ResourceProfileManager.isSupported(ResourceProfileManager.scala:71) at org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:126) at org.apache.spark.rdd.RDD.withResources(RDD.scala:1802) ... 42 elided
Reported as SPARK-43912 Incorrect SparkException for Stage-Level Scheduling in local mode.
Until it is fixed, enable Dynamic Allocation.
$ ./bin/spark-shell -c spark.dynamicAllocation.enabled=true