Skip to content

Stage-Level Scheduling

Stage-Level Scheduling uses ResourceProfiles for the following:

  • Spark developers can specify task and executor resource requirements at stage level
  • Spark (Scheduler) uses the stage-level requirements to acquire the necessary resources and executors and schedule tasks based on the per-stage requirements

Apache Spark 3.1.1

Stage-Level Scheduling was introduced in Apache Spark 3.1.1 (cf. SPARK-27495)

Resource Profiles

Resource Profiles are managed by ResourceProfileManager.

The Default ResourceProfile is known by ID 0.

Custom Resource Profiles are ResourceProfiles with non-0 IDs. Custom Resource Profiles are only supported on YARN, Kubernetes and Spark Standalone.

ResourceProfiles are associated with an RDD using withResources operator.

Resource Requests

Executor

Executor Resource Requests are specified using executorResources of a ResourceProfile.

Executor Resource Requests can be the following built-in resources:

  • cores
  • memory
  • memoryOverhead
  • pyspark.memory
  • offHeap

Other (deployment environment-specific) executor resource requests can be defined as Custom Executor Resources.

Task

Default Task Resources are specified based on spark.task.cpus and spark.task.resource-prefixed configuration properties.

SparkListenerResourceProfileAdded

ResourceProfiles can be monitored using SparkListenerResourceProfileAdded.

Dynamic Allocation

Dynamic Allocation of Executors is not supported.

Demo

Describe Distributed Computation

Let's describe a distributed computation (using RDD API) over a 10-record dataset.

val rdd = sc.range(0, 9)

Describe Required Resources

Optional Step

This demo assumes to be executed in local deployment mode (that supports the default ResourceProfile only) and so the step is considered optional until a supported cluster manager is used.

import org.apache.spark.resource.ResourceProfileBuilder
val rpb = new ResourceProfileBuilder
val rp1 = rpb.build()
scala> println(rp1.toString)
Profile: id = 1, executor resources: , task resources:

Configure Default ResourceProfile

FIXME

Use spark.task.resource-prefixed properties per ResourceUtils.

Associate Required Resources to Distributed Computation

rdd.withResources(rp1)
scala> rdd.withResources(rp1)
org.apache.spark.SparkException: TaskResourceProfiles are only supported for Standalone cluster for now when dynamic allocation is disabled.
  at org.apache.spark.resource.ResourceProfileManager.isSupported(ResourceProfileManager.scala:71)
  at org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:126)
  at org.apache.spark.rdd.RDD.withResources(RDD.scala:1802)
  ... 42 elided
SPARK-43912

Reported as SPARK-43912 Incorrect SparkException for Stage-Level Scheduling in local mode.

Until it is fixed, enable Dynamic Allocation.

$ ./bin/spark-shell -c spark.dynamicAllocation.enabled=true