Skip to content

ResourceProfile

ResourceProfile is a resource profile that describes executor and task requirements of an RDD in Stage-Level Scheduling.

ResourceProfile can be associated with an RDD using RDD.withResources method.

The ResourceProfile of an RDD is available using RDD.getResourceProfile method.

Creating Instance

ResourceProfile takes the following to be created:

  • Executor Resources (Map[String, ExecutorResourceRequest])
  • Task Resources (Map[String, TaskResourceRequest])

ResourceProfile is created (directly or using getOrCreateDefaultProfile) when:

Built-In Executor Resources

ResourceProfile defines the following names as the Supported Executor Resources (among the specified executorResources):

  • cores
  • memory
  • memoryOverhead
  • pyspark.memory
  • offHeap

All other executor resources (names) are considered Custom Executor Resources.

Custom Executor Resources

getCustomExecutorResources(): Map[String, ExecutorResourceRequest]

getCustomExecutorResources is the Executor Resources that are not supported executor resources.


getCustomExecutorResources is used when:

Limiting Resource

limitingResource(
  sparkConf: SparkConf): String

limitingResource takes the _limitingResource, if calculated earlier, or calculateTasksAndLimitingResource.


limitingResource is used when:

_limitingResource

_limitingResource: Option[String] = None

ResourceProfile defines _limitingResource variable that is determined (if there is one) while calculateTasksAndLimitingResource.

_limitingResource can be the following:

Default Profile

ResourceProfile (Scala object) defines defaultProfile internal registry for the default ResourceProfile (per JVM instance).

defaultProfile is undefined (None) and gets a new ResourceProfile when first requested.

defaultProfile can be accessed using getOrCreateDefaultProfile.

defaultProfile is cleared (removed) in clearDefaultProfile.

getOrCreateDefaultProfile

getOrCreateDefaultProfile(
  conf: SparkConf): ResourceProfile

getOrCreateDefaultProfile returns the default profile (if already defined) or creates a new one.

Unless defined, getOrCreateDefaultProfile creates a ResourceProfile with the default task and executor resource descriptions and makes it the defaultProfile.

getOrCreateDefaultProfile prints out the following INFO message to the logs:

Default ResourceProfile created,
executor resources: [executorResources], task resources: [taskResources]

getOrCreateDefaultProfile is used when:

Default Executor Resource Requests

getDefaultExecutorResources(
  conf: SparkConf): Map[String, ExecutorResourceRequest]

getDefaultExecutorResources creates an ExecutorResourceRequests with the following:

Property Configuration Property
cores spark.executor.cores
memory spark.executor.memory
memoryOverhead spark.executor.memoryOverhead
pysparkMemory spark.executor.pyspark.memory
offHeapMemory spark.memory.offHeap.size

getDefaultExecutorResources finds executor resource requests (with the spark.executor component name in the given SparkConf) for ExecutorResourceRequests.

getDefaultExecutorResources initializes the defaultProfileExecutorResources (with the executor resource requests).

In the end, getDefaultExecutorResources requests the ExecutorResourceRequests for all the resource requests

Default Task Resource Requests

getDefaultTaskResources(
  conf: SparkConf): Map[String, TaskResourceRequest]

getDefaultTaskResources creates a new TaskResourceRequests with the cpus based on spark.task.cpus configuration property.

getDefaultTaskResources adds task resource requests (configured in the given SparkConf using spark.task.resource-prefixed properties).

In the end, getDefaultTaskResources requests the TaskResourceRequests for the requests.

getResourcesForClusterManager

getResourcesForClusterManager(
  rpId: Int,
  execResources: Map[String, ExecutorResourceRequest],
  overheadFactor: Double,
  conf: SparkConf,
  isPythonApp: Boolean,
  resourceMappings: Map[String, String]): ExecutorResourcesOrDefaults

getResourcesForClusterManager takes the DefaultProfileExecutorResources.

getResourcesForClusterManager calculates the overhead memory with the following:

  • memoryOverheadMiB and executorMemoryMiB of the DefaultProfileExecutorResources
  • Given overheadFactor

If the given rpId resource profile ID is not the default ID (0), getResourcesForClusterManager...FIXME (there is so much to "digest")

getResourcesForClusterManager...FIXME

In the end, getResourcesForClusterManager creates a ExecutorResourcesOrDefaults.


getResourcesForClusterManager is used when:

  • BasicExecutorFeatureStep (Spark on Kubernetes) is created
  • YarnAllocator (Spark on YARN) is requested to createYarnResourceForResourceProfile

getDefaultProfileExecutorResources

getDefaultProfileExecutorResources(
  conf: SparkConf): DefaultProfileExecutorResources

getDefaultProfileExecutorResources...FIXME


getDefaultProfileExecutorResources is used when:

Serializable

ResourceProfile is a Java Serializable.

Logging

Enable ALL logging level for org.apache.spark.resource.ResourceProfile logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.ResourceProfile.name = org.apache.spark.resource.ResourceProfile
logger.ResourceProfile.level = all

Refer to Logging.