ResourceProfile¶
ResourceProfile
is a resource profile that describes executor and task requirements of an RDD in Stage-Level Scheduling.
ResourceProfile
can be associated with an RDD
using RDD.withResources method.
The ResourceProfile
of an RDD
is available using RDD.getResourceProfile method.
Creating Instance¶
ResourceProfile
takes the following to be created:
- Executor Resources (
Map[String, ExecutorResourceRequest]
) - Task Resources (
Map[String, TaskResourceRequest]
)
ResourceProfile
is created (directly or using getOrCreateDefaultProfile) when:
DriverEndpoint
is requested to handle a RetrieveSparkAppConfig messageResourceProfileBuilder
utility is requested to build
Built-In Executor Resources¶
ResourceProfile
defines the following names as the Supported Executor Resources (among the specified executorResources):
cores
memory
memoryOverhead
pyspark.memory
offHeap
All other executor resources (names) are considered Custom Executor Resources.
Custom Executor Resources¶
getCustomExecutorResources(): Map[String, ExecutorResourceRequest]
getCustomExecutorResources
is the Executor Resources that are not supported executor resources.
getCustomExecutorResources
is used when:
ApplicationDescription
is requested toresourceReqsPerExecutor
ApplicationInfo
is requested tocreateResourceDescForResourceProfile
ResourceProfile
is requested to calculateTasksAndLimitingResourceResourceUtils
is requested to getOrDiscoverAllResourcesForResourceProfile, warnOnWastedResources
Limiting Resource¶
limitingResource(
sparkConf: SparkConf): String
limitingResource
takes the _limitingResource, if calculated earlier, or calculateTasksAndLimitingResource.
limitingResource
is used when:
ResourceProfileManager
is requested to add a new ResourceProfile (to recompute a limiting resource eagerly)ResourceUtils
is requested to warnOnWastedResources (for reporting purposes only)
_limitingResource¶
_limitingResource: Option[String] = None
ResourceProfile
defines _limitingResource
variable that is determined (if there is one) while calculateTasksAndLimitingResource.
_limitingResource
can be the following:
- A "special" empty resource identifier (that is assumed
cpus
in TaskSchedulerImpl) cpus
built-in task resource identifier- any custom resource identifier
Default Profile¶
ResourceProfile
(Scala object) defines defaultProfile
internal registry for the default ResourceProfile (per JVM instance).
defaultProfile
is undefined (None
) and gets a new ResourceProfile
when first requested.
defaultProfile
can be accessed using getOrCreateDefaultProfile.
defaultProfile
is cleared (removed) in clearDefaultProfile.
getOrCreateDefaultProfile¶
getOrCreateDefaultProfile(
conf: SparkConf): ResourceProfile
getOrCreateDefaultProfile
returns the default profile (if already defined) or creates a new one.
Unless defined, getOrCreateDefaultProfile
creates a ResourceProfile with the default task and executor resource descriptions and makes it the defaultProfile.
getOrCreateDefaultProfile
prints out the following INFO message to the logs:
Default ResourceProfile created,
executor resources: [executorResources], task resources: [taskResources]
getOrCreateDefaultProfile
is used when:
TaskResourceProfile
is requested to getCustomExecutorResourcesResourceProfile
is requested to getDefaultProfileExecutorResourcesResourceProfileManager
is createdYarnAllocator
(Spark on YARN) is requested toinitDefaultProfile
Default Executor Resource Requests¶
getDefaultExecutorResources(
conf: SparkConf): Map[String, ExecutorResourceRequest]
getDefaultExecutorResources
creates an ExecutorResourceRequests with the following:
getDefaultExecutorResources
finds executor resource requests (with the spark.executor
component name in the given SparkConf) for ExecutorResourceRequests.
getDefaultExecutorResources
initializes the defaultProfileExecutorResources (with the executor resource requests).
In the end, getDefaultExecutorResources
requests the ExecutorResourceRequests
for all the resource requests
Default Task Resource Requests¶
getDefaultTaskResources(
conf: SparkConf): Map[String, TaskResourceRequest]
getDefaultTaskResources
creates a new TaskResourceRequests with the cpus based on spark.task.cpus configuration property.
getDefaultTaskResources
adds task resource requests (configured in the given SparkConf using spark.task.resource
-prefixed properties).
In the end, getDefaultTaskResources
requests the TaskResourceRequests
for the requests.
getResourcesForClusterManager¶
getResourcesForClusterManager(
rpId: Int,
execResources: Map[String, ExecutorResourceRequest],
overheadFactor: Double,
conf: SparkConf,
isPythonApp: Boolean,
resourceMappings: Map[String, String]): ExecutorResourcesOrDefaults
getResourcesForClusterManager
takes the DefaultProfileExecutorResources.
getResourcesForClusterManager
calculates the overhead memory with the following:
memoryOverheadMiB
andexecutorMemoryMiB
of theDefaultProfileExecutorResources
- Given
overheadFactor
If the given rpId
resource profile ID is not the default ID (0
), getResourcesForClusterManager
...FIXME (there is so much to "digest")
getResourcesForClusterManager
...FIXME
In the end, getResourcesForClusterManager
creates a ExecutorResourcesOrDefaults
.
getResourcesForClusterManager
is used when:
BasicExecutorFeatureStep
(Spark on Kubernetes) is createdYarnAllocator
(Spark on YARN) is requested tocreateYarnResourceForResourceProfile
getDefaultProfileExecutorResources¶
getDefaultProfileExecutorResources(
conf: SparkConf): DefaultProfileExecutorResources
getDefaultProfileExecutorResources
...FIXME
getDefaultProfileExecutorResources
is used when:
ResourceProfile
is requested to getResourcesForClusterManagerYarnAllocator
(Spark on YARN) is requested torunAllocatedContainers
Serializable¶
ResourceProfile
is a Java Serializable.
Logging¶
Enable ALL
logging level for org.apache.spark.resource.ResourceProfile
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.ResourceProfile.name = org.apache.spark.resource.ResourceProfile
logger.ResourceProfile.level = all
Refer to Logging.