ResourceProfile¶
ResourceProfile is a resource profile that describes executor and task requirements of an RDD in Stage-Level Scheduling.
ResourceProfile can be associated with an RDD using RDD.withResources method.
The ResourceProfile of an RDD is available using RDD.getResourceProfile method.
Creating Instance¶
ResourceProfile takes the following to be created:
- Executor Resources (
Map[String, ExecutorResourceRequest]) - Task Resources (
Map[String, TaskResourceRequest])
ResourceProfile is created (directly or using getOrCreateDefaultProfile) when:
DriverEndpointis requested to handle a RetrieveSparkAppConfig messageResourceProfileBuilderutility is requested to build
Built-In Executor Resources¶
ResourceProfile defines the following names as the Supported Executor Resources (among the specified executorResources):
coresmemorymemoryOverheadpyspark.memoryoffHeap
All other executor resources (names) are considered Custom Executor Resources.
Custom Executor Resources¶
getCustomExecutorResources(): Map[String, ExecutorResourceRequest]
getCustomExecutorResources is the Executor Resources that are not supported executor resources.
getCustomExecutorResources is used when:
ApplicationDescriptionis requested toresourceReqsPerExecutorApplicationInfois requested tocreateResourceDescForResourceProfileResourceProfileis requested to calculateTasksAndLimitingResourceResourceUtilsis requested to getOrDiscoverAllResourcesForResourceProfile, warnOnWastedResources
Limiting Resource¶
limitingResource(
sparkConf: SparkConf): String
limitingResource takes the _limitingResource, if calculated earlier, or calculateTasksAndLimitingResource.
limitingResource is used when:
ResourceProfileManageris requested to add a new ResourceProfile (to recompute a limiting resource eagerly)ResourceUtilsis requested to warnOnWastedResources (for reporting purposes only)
_limitingResource¶
_limitingResource: Option[String] = None
ResourceProfile defines _limitingResource variable that is determined (if there is one) while calculateTasksAndLimitingResource.
_limitingResource can be the following:
- A "special" empty resource identifier (that is assumed
cpusin TaskSchedulerImpl) cpusbuilt-in task resource identifier- any custom resource identifier
Default Profile¶
ResourceProfile (Scala object) defines defaultProfile internal registry for the default ResourceProfile (per JVM instance).
defaultProfile is undefined (None) and gets a new ResourceProfile when first requested.
defaultProfile can be accessed using getOrCreateDefaultProfile.
defaultProfile is cleared (removed) in clearDefaultProfile.
getOrCreateDefaultProfile¶
getOrCreateDefaultProfile(
conf: SparkConf): ResourceProfile
getOrCreateDefaultProfile returns the default profile (if already defined) or creates a new one.
Unless defined, getOrCreateDefaultProfile creates a ResourceProfile with the default task and executor resource descriptions and makes it the defaultProfile.
getOrCreateDefaultProfile prints out the following INFO message to the logs:
Default ResourceProfile created,
executor resources: [executorResources], task resources: [taskResources]
getOrCreateDefaultProfile is used when:
TaskResourceProfileis requested to getCustomExecutorResourcesResourceProfileis requested to getDefaultProfileExecutorResourcesResourceProfileManageris createdYarnAllocator(Spark on YARN) is requested toinitDefaultProfile
Default Executor Resource Requests¶
getDefaultExecutorResources(
conf: SparkConf): Map[String, ExecutorResourceRequest]
getDefaultExecutorResources creates an ExecutorResourceRequests with the following:
getDefaultExecutorResources finds executor resource requests (with the spark.executor component name in the given SparkConf) for ExecutorResourceRequests.
getDefaultExecutorResources initializes the defaultProfileExecutorResources (with the executor resource requests).
In the end, getDefaultExecutorResources requests the ExecutorResourceRequests for all the resource requests
Default Task Resource Requests¶
getDefaultTaskResources(
conf: SparkConf): Map[String, TaskResourceRequest]
getDefaultTaskResources creates a new TaskResourceRequests with the cpus based on spark.task.cpus configuration property.
getDefaultTaskResources adds task resource requests (configured in the given SparkConf using spark.task.resource-prefixed properties).
In the end, getDefaultTaskResources requests the TaskResourceRequests for the requests.
getResourcesForClusterManager¶
getResourcesForClusterManager(
rpId: Int,
execResources: Map[String, ExecutorResourceRequest],
overheadFactor: Double,
conf: SparkConf,
isPythonApp: Boolean,
resourceMappings: Map[String, String]): ExecutorResourcesOrDefaults
getResourcesForClusterManager takes the DefaultProfileExecutorResources.
getResourcesForClusterManager calculates the overhead memory with the following:
memoryOverheadMiBandexecutorMemoryMiBof theDefaultProfileExecutorResources- Given
overheadFactor
If the given rpId resource profile ID is not the default ID (0), getResourcesForClusterManager...FIXME (there is so much to "digest")
getResourcesForClusterManager...FIXME
In the end, getResourcesForClusterManager creates a ExecutorResourcesOrDefaults.
getResourcesForClusterManager is used when:
BasicExecutorFeatureStep(Spark on Kubernetes) is createdYarnAllocator(Spark on YARN) is requested tocreateYarnResourceForResourceProfile
getDefaultProfileExecutorResources¶
getDefaultProfileExecutorResources(
conf: SparkConf): DefaultProfileExecutorResources
getDefaultProfileExecutorResources...FIXME
getDefaultProfileExecutorResources is used when:
ResourceProfileis requested to getResourcesForClusterManagerYarnAllocator(Spark on YARN) is requested torunAllocatedContainers
Serializable¶
ResourceProfile is a Java Serializable.
Logging¶
Enable ALL logging level for org.apache.spark.resource.ResourceProfile logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.ResourceProfile.name = org.apache.spark.resource.ResourceProfile
logger.ResourceProfile.level = all
Refer to Logging.