Utils Utility¶
getDynamicAllocationInitialExecutors¶
getDynamicAllocationInitialExecutors(
conf: SparkConf): Int
getDynamicAllocationInitialExecutors gives the maximum value of the following configuration properties (for the initial number of executors):
- spark.dynamicAllocation.initialExecutors
- spark.dynamicAllocation.minExecutors
- spark.executor.instances
getDynamicAllocationInitialExecutors prints out the following INFO message to the logs:
Using initial executors = [initialExecutors],
max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
With spark.dynamicAllocation.initialExecutors less than spark.dynamicAllocation.minExecutors, getDynamicAllocationInitialExecutors prints out the following WARN message to the logs:
spark.dynamicAllocation.initialExecutors less than spark.dynamicAllocation.minExecutors is invalid,
ignoring its setting, please update your configs.
With spark.executor.instances less than spark.dynamicAllocation.minExecutors, getDynamicAllocationInitialExecutors prints out the following WARN message to the logs:
spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid,
ignoring its setting, please update your configs.
getDynamicAllocationInitialExecutors is used when:
ExecutorAllocationManageris createdSchedulerBackendUtilsutility is used to getInitialTargetExecutorNumber
Local Directories for Scratch Space¶
getConfiguredLocalDirs(
conf: SparkConf): Array[String]
getConfiguredLocalDirs returns the local directories where Spark can write files to.
getConfiguredLocalDirs uses the given SparkConf to know if External Shuffle Service is enabled or not (based on spark.shuffle.service.enabled configuration property).
When in a YARN container (CONTAINER_ID), getConfiguredLocalDirs uses LOCAL_DIRS environment variable for YARN-approved local directories.
In non-YARN mode (or for the driver in yarn-client mode), getConfiguredLocalDirs checks the following environment variables (in order) and returns the value of the first found:
SPARK_EXECUTOR_DIRSSPARK_LOCAL_DIRSMESOS_DIRECTORY(only when External Shuffle Service is not used)
The environment variables are a comma-separated list of local directory paths.
In the end, when no earlier environment variables were found, getConfiguredLocalDirs uses spark.local.dir configuration property (with java.io.tmpdir System property as the default value).
getConfiguredLocalDirs is used when:
DiskBlockManageris requested to createLocalDirs and createLocalDirsForMergedShuffleBlocksUtilsutility is used to get a single random local root directory and create a spark directory in every local root directory
Random Local Directory Path¶
getLocalDir(
conf: SparkConf): String
getLocalDir takes a random directory path out of the configured local root directories
getLocalDir throws an IOException if no local directory is defined:
Failed to get a temp directory under [[configuredLocalDirs]].
getLocalDir is used when:
SparkEnvutility is used to create a base SparkEnv for the driverUtilsutility is used to fetchFileDriverLoggeris createdRocksDBStateStoreProvider(Spark Structured Streaming) is requested for aRocksDBPythonBroadcast(PySpark) is requested toreadObjectAggregateInPandasExec(PySpark) is requested todoExecuteEvalPythonExec(PySpark) is requested todoExecuteWindowInPandasExec(PySpark) is requested todoExecutePythonForeachWriter(PySpark) is requested for aUnsafeRowBufferClient(Spark on YARN) is requested toprepareLocalResourcesandcreateConfArchive
localRootDirs Registry¶
Utils utility uses localRootDirs internal registry so getOrCreateLocalRootDirsImpl is executed just once (when first requested).
localRootDirs is available using getOrCreateLocalRootDirs method.
getOrCreateLocalRootDirs(
conf: SparkConf): Array[String]
getOrCreateLocalRootDirs is used when:
Utilsis used to getLocalDirWorker(Spark Standalone) is requested to launch an executor
Creating spark Directory in Every Local Root Directory¶
getOrCreateLocalRootDirsImpl(
conf: SparkConf): Array[String]
getOrCreateLocalRootDirsImpl creates a spark-[randomUUID] directory under every root directory for local storage (and registers a shutdown hook to delete the directories at shutdown).
getOrCreateLocalRootDirsImpl prints out the following WARN message to the logs when there is a local root directories as a URI (with a scheme):
The configured local directories are not expected to be URIs;
however, got suspicious values [[uris]].
Please check your configured local directories.
Local URI Scheme¶
Utils defines a local URI scheme for files that are locally available on worker nodes in the cluster.
The local URL scheme is used when:
Utilsis used to isLocalUriClient(Spark on YARN) is used
isLocalUri¶
isLocalUri(
uri: String): Boolean
isLocalUri is true when the URI is a local: URI (the given uri starts with local: scheme).
isLocalUri is used when:
- FIXME
getCurrentUserName¶
getCurrentUserName(): String
getCurrentUserName computes the user name who has started the SparkContext.md[SparkContext] instance.
NOTE: It is later available as SparkContext.md#sparkUser[SparkContext.sparkUser].
Internally, it reads SparkContext.md#SPARK_USER[SPARK_USER] environment variable and, if not set, reverts to Hadoop Security API's UserGroupInformation.getCurrentUser().getShortUserName().
NOTE: It is another place where Spark relies on Hadoop API for its operation.
localHostName¶
localHostName(): String
localHostName computes the local host name.
It starts by checking SPARK_LOCAL_HOSTNAME environment variable for the value. If it is not defined, it uses SPARK_LOCAL_IP to find the name (using InetAddress.getByName). If it is not defined either, it calls InetAddress.getLocalHost for the name.
NOTE: Utils.localHostName is executed while SparkContext.md#creating-instance[SparkContext is created] and also to compute the default value of spark-driver.md#spark_driver_host[spark.driver.host Spark property].
getUserJars¶
getUserJars(
conf: SparkConf): Seq[String]
getUserJars is the spark.jars configuration property with non-empty entries.
getUserJars is used when:
SparkContextis created
extractHostPortFromSparkUrl¶
extractHostPortFromSparkUrl(
sparkUrl: String): (String, Int)
extractHostPortFromSparkUrl creates a Java URI with the input sparkUrl and takes the host and port parts.
extractHostPortFromSparkUrl asserts that the input sparkURL uses spark scheme.
extractHostPortFromSparkUrl throws a SparkException for unparseable spark URLs:
Invalid master URL: [sparkUrl]
extractHostPortFromSparkUrl is used when:
StandaloneSubmitRequestServletis requested tobuildDriverDescriptionRpcAddressis requested to extract an RpcAddress from a Spark master URL
isDynamicAllocationEnabled¶
isDynamicAllocationEnabled(
conf: SparkConf): Boolean
isDynamicAllocationEnabled checks whether Dynamic Allocation of Executors is enabled (true) or not (false).
isDynamicAllocationEnabled is positive (true) when all the following hold:
- spark.dynamicAllocation.enabled configuration property is
true - spark.master is non-
local
isDynamicAllocationEnabled is used when:
SparkContextis created (to start an ExecutorAllocationManager)TaskResourceProfileis requested for custom executor resourcesResourceProfileManageris createdDAGScheduleris requested to checkBarrierStageWithDynamicAllocationTaskSchedulerImplis requested to resourceOffersSchedulerBackendUtilsis requested to getInitialTargetExecutorNumberStandaloneSchedulerBackend(Spark Standalone) is requested tostart(for reporting purposes)ExecutorPodsAllocator(Spark on Kubernetes) is created (maxPVCs)ApplicationMaster(Spark on YARN) is created (maxNumExecutorFailures)YarnSchedulerBackend(Spark on YARN) is requested togetShufflePushMergerLocations
checkAndGetK8sMasterUrl¶
checkAndGetK8sMasterUrl(
rawMasterURL: String): String
checkAndGetK8sMasterUrl...FIXME
checkAndGetK8sMasterUrl is used when:
SparkSubmitis requested to prepareSubmitEnvironment (for Kubernetes cluster manager)
Fetching File¶
fetchFile(
url: String,
targetDir: File,
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration,
timestamp: Long,
useCache: Boolean): File
fetchFile...FIXME
fetchFile is used when:
-
SparkContextis requested to SparkContext.md#addFile[addFile] -
Executoris requested to executor:Executor.md#updateDependencies[updateDependencies] -
Spark Standalone's
DriverRunneris requested todownloadUserJar
isPushBasedShuffleEnabled¶
isPushBasedShuffleEnabled(
conf: SparkConf,
isDriver: Boolean,
checkSerializer: Boolean = true): Boolean
isPushBasedShuffleEnabled takes the value of spark.shuffle.push.enabled configuration property (from the given SparkConf).
If false, isPushBasedShuffleEnabled does nothing and returns false as well.
Otherwise, isPushBasedShuffleEnabled returns whether it is even possible to use push-based shuffle or not based on the following:
- External Shuffle Service is used (based on spark.shuffle.service.enabled that should be
true) - spark.master is
yarn - (only with
checkSerializerenabled) spark.serializer is a Serializer that supportsRelocationOfSerializedObjects - spark.io.encryption.enabled is
false
In case spark.shuffle.push.enabled configuration property is enabled but the above requirements did not hold, isPushBasedShuffleEnabled prints out the following WARN message to the logs:
Push-based shuffle can only be enabled
when the application is submitted to run in YARN mode,
with external shuffle service enabled, IO encryption disabled,
and relocation of serialized objects supported.
isPushBasedShuffleEnabled is used when:
ShuffleDependencyis requested to canShuffleMergeBeEnabledMapOutputTrackerMasteris createdMapOutputTrackerWorkeris createdDAGScheduleris createdShuffleBlockPusherutility is used to create aBLOCK_PUSHER_POOLthread poolBlockManageris requested to initialize and registerWithExternalShuffleServerBlockManagerMasterEndpointis createdDiskBlockManageris requested to createLocalDirsForMergedShuffleBlocks
Logging¶
Enable ALL logging level for org.apache.spark.util.Utils logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.util.Utils=ALL
Refer to Logging.