Skip to content

Utils Utility

getDynamicAllocationInitialExecutors

getDynamicAllocationInitialExecutors(
  conf: SparkConf): Int

getDynamicAllocationInitialExecutors gives the maximum value of the following configuration properties (for the initial number of executors):

getDynamicAllocationInitialExecutors prints out the following INFO message to the logs:

Using initial executors = [initialExecutors],
max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances

With spark.dynamicAllocation.initialExecutors less than spark.dynamicAllocation.minExecutors, getDynamicAllocationInitialExecutors prints out the following WARN message to the logs:

spark.dynamicAllocation.initialExecutors less than spark.dynamicAllocation.minExecutors is invalid,
ignoring its setting, please update your configs.

With spark.executor.instances less than spark.dynamicAllocation.minExecutors, getDynamicAllocationInitialExecutors prints out the following WARN message to the logs:

spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid,
ignoring its setting, please update your configs.

getDynamicAllocationInitialExecutors is used when:

Local Directories for Scratch Space

getConfiguredLocalDirs(
  conf: SparkConf): Array[String]

getConfiguredLocalDirs returns the local directories where Spark can write files to.


getConfiguredLocalDirs uses the given SparkConf to know if External Shuffle Service is enabled or not (based on spark.shuffle.service.enabled configuration property).

When in a YARN container (CONTAINER_ID), getConfiguredLocalDirs uses LOCAL_DIRS environment variable for YARN-approved local directories.

In non-YARN mode (or for the driver in yarn-client mode), getConfiguredLocalDirs checks the following environment variables (in order) and returns the value of the first found:

  1. SPARK_EXECUTOR_DIRS
  2. SPARK_LOCAL_DIRS
  3. MESOS_DIRECTORY (only when External Shuffle Service is not used)

The environment variables are a comma-separated list of local directory paths.

In the end, when no earlier environment variables were found, getConfiguredLocalDirs uses spark.local.dir configuration property (with java.io.tmpdir System property as the default value).


getConfiguredLocalDirs is used when:

Random Local Directory Path

getLocalDir(
  conf: SparkConf): String

getLocalDir takes a random directory path out of the configured local root directories

getLocalDir throws an IOException if no local directory is defined:

Failed to get a temp directory under [[configuredLocalDirs]].

getLocalDir is used when:

  • SparkEnv utility is used to create a base SparkEnv for the driver
  • Utils utility is used to fetchFile
  • DriverLogger is created
  • RocksDBStateStoreProvider (Spark Structured Streaming) is requested for a RocksDB
  • PythonBroadcast (PySpark) is requested to readObject
  • AggregateInPandasExec (PySpark) is requested to doExecute
  • EvalPythonExec (PySpark) is requested to doExecute
  • WindowInPandasExec (PySpark) is requested to doExecute
  • PythonForeachWriter (PySpark) is requested for a UnsafeRowBuffer
  • Client (Spark on YARN) is requested to prepareLocalResources and createConfArchive

localRootDirs Registry

Utils utility uses localRootDirs internal registry so getOrCreateLocalRootDirsImpl is executed just once (when first requested).

localRootDirs is available using getOrCreateLocalRootDirs method.

getOrCreateLocalRootDirs(
  conf: SparkConf): Array[String]

getOrCreateLocalRootDirs is used when:

Creating spark Directory in Every Local Root Directory

getOrCreateLocalRootDirsImpl(
  conf: SparkConf): Array[String]

getOrCreateLocalRootDirsImpl creates a spark-[randomUUID] directory under every root directory for local storage (and registers a shutdown hook to delete the directories at shutdown).

getOrCreateLocalRootDirsImpl prints out the following WARN message to the logs when there is a local root directories as a URI (with a scheme):

The configured local directories are not expected to be URIs;
however, got suspicious values [[uris]].
Please check your configured local directories.

Local URI Scheme

Utils defines a local URI scheme for files that are locally available on worker nodes in the cluster.

The local URL scheme is used when:

  • Utils is used to isLocalUri
  • Client (Spark on YARN) is used

isLocalUri

isLocalUri(
  uri: String): Boolean

isLocalUri is true when the URI is a local: URI (the given uri starts with local: scheme).

isLocalUri is used when:

  • FIXME

getCurrentUserName

getCurrentUserName(): String

getCurrentUserName computes the user name who has started the SparkContext.md[SparkContext] instance.

NOTE: It is later available as SparkContext.md#sparkUser[SparkContext.sparkUser].

Internally, it reads SparkContext.md#SPARK_USER[SPARK_USER] environment variable and, if not set, reverts to Hadoop Security API's UserGroupInformation.getCurrentUser().getShortUserName().

NOTE: It is another place where Spark relies on Hadoop API for its operation.

localHostName

localHostName(): String

localHostName computes the local host name.

It starts by checking SPARK_LOCAL_HOSTNAME environment variable for the value. If it is not defined, it uses SPARK_LOCAL_IP to find the name (using InetAddress.getByName). If it is not defined either, it calls InetAddress.getLocalHost for the name.

NOTE: Utils.localHostName is executed while SparkContext.md#creating-instance[SparkContext is created] and also to compute the default value of spark-driver.md#spark_driver_host[spark.driver.host Spark property].

getUserJars

getUserJars(
  conf: SparkConf): Seq[String]

getUserJars is the spark.jars configuration property with non-empty entries.

getUserJars is used when:

extractHostPortFromSparkUrl

extractHostPortFromSparkUrl(
  sparkUrl: String): (String, Int)

extractHostPortFromSparkUrl creates a Java URI with the input sparkUrl and takes the host and port parts.

extractHostPortFromSparkUrl asserts that the input sparkURL uses spark scheme.

extractHostPortFromSparkUrl throws a SparkException for unparseable spark URLs:

Invalid master URL: [sparkUrl]

extractHostPortFromSparkUrl is used when:

isDynamicAllocationEnabled

isDynamicAllocationEnabled(
  conf: SparkConf): Boolean

isDynamicAllocationEnabled checks whether Dynamic Allocation of Executors is enabled (true) or not (false).


isDynamicAllocationEnabled is positive (true) when all the following hold:

  1. spark.dynamicAllocation.enabled configuration property is true
  2. spark.master is non-local

isDynamicAllocationEnabled is used when:

checkAndGetK8sMasterUrl

checkAndGetK8sMasterUrl(
  rawMasterURL: String): String

checkAndGetK8sMasterUrl...FIXME

checkAndGetK8sMasterUrl is used when:

Fetching File

fetchFile(
  url: String,
  targetDir: File,
  conf: SparkConf,
  securityMgr: SecurityManager,
  hadoopConf: Configuration,
  timestamp: Long,
  useCache: Boolean): File

fetchFile...FIXME

fetchFile is used when:

  • SparkContext is requested to SparkContext.md#addFile[addFile]

  • Executor is requested to executor:Executor.md#updateDependencies[updateDependencies]

  • Spark Standalone's DriverRunner is requested to downloadUserJar

isPushBasedShuffleEnabled

isPushBasedShuffleEnabled(
  conf: SparkConf,
  isDriver: Boolean,
  checkSerializer: Boolean = true): Boolean

isPushBasedShuffleEnabled takes the value of spark.shuffle.push.enabled configuration property (from the given SparkConf).

If false, isPushBasedShuffleEnabled does nothing and returns false as well.

Otherwise, isPushBasedShuffleEnabled returns whether it is even possible to use push-based shuffle or not based on the following:

  1. External Shuffle Service is used (based on spark.shuffle.service.enabled that should be true)
  2. spark.master is yarn
  3. (only with checkSerializer enabled) spark.serializer is a Serializer that supportsRelocationOfSerializedObjects
  4. spark.io.encryption.enabled is false

In case spark.shuffle.push.enabled configuration property is enabled but the above requirements did not hold, isPushBasedShuffleEnabled prints out the following WARN message to the logs:

Push-based shuffle can only be enabled
when the application is submitted to run in YARN mode,
with external shuffle service enabled, IO encryption disabled,
and relocation of serialized objects supported.

isPushBasedShuffleEnabled is used when:

Logging

Enable ALL logging level for org.apache.spark.util.Utils logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.util.Utils=ALL

Refer to Logging.