Utils Utility¶
getDynamicAllocationInitialExecutors¶
getDynamicAllocationInitialExecutors(
conf: SparkConf): Int
getDynamicAllocationInitialExecutors
gives the maximum value of the following configuration properties (for the initial number of executors):
- spark.dynamicAllocation.initialExecutors
- spark.dynamicAllocation.minExecutors
- spark.executor.instances
getDynamicAllocationInitialExecutors
prints out the following INFO message to the logs:
Using initial executors = [initialExecutors],
max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
With spark.dynamicAllocation.initialExecutors less than spark.dynamicAllocation.minExecutors, getDynamicAllocationInitialExecutors
prints out the following WARN message to the logs:
spark.dynamicAllocation.initialExecutors less than spark.dynamicAllocation.minExecutors is invalid,
ignoring its setting, please update your configs.
With spark.executor.instances less than spark.dynamicAllocation.minExecutors, getDynamicAllocationInitialExecutors
prints out the following WARN message to the logs:
spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid,
ignoring its setting, please update your configs.
getDynamicAllocationInitialExecutors
is used when:
ExecutorAllocationManager
is createdSchedulerBackendUtils
utility is used to getInitialTargetExecutorNumber
Local Directories for Scratch Space¶
getConfiguredLocalDirs(
conf: SparkConf): Array[String]
getConfiguredLocalDirs
returns the local directories where Spark can write files to.
getConfiguredLocalDirs
uses the given SparkConf to know if External Shuffle Service is enabled or not (based on spark.shuffle.service.enabled configuration property).
When in a YARN container (CONTAINER_ID
), getConfiguredLocalDirs
uses LOCAL_DIRS
environment variable for YARN-approved local directories.
In non-YARN mode (or for the driver in yarn-client mode), getConfiguredLocalDirs
checks the following environment variables (in order) and returns the value of the first found:
SPARK_EXECUTOR_DIRS
SPARK_LOCAL_DIRS
MESOS_DIRECTORY
(only when External Shuffle Service is not used)
The environment variables are a comma-separated list of local directory paths.
In the end, when no earlier environment variables were found, getConfiguredLocalDirs
uses spark.local.dir configuration property (with java.io.tmpdir
System property as the default value).
getConfiguredLocalDirs
is used when:
DiskBlockManager
is requested to createLocalDirs and createLocalDirsForMergedShuffleBlocksUtils
utility is used to get a single random local root directory and create a spark directory in every local root directory
Random Local Directory Path¶
getLocalDir(
conf: SparkConf): String
getLocalDir
takes a random directory path out of the configured local root directories
getLocalDir
throws an IOException
if no local directory is defined:
Failed to get a temp directory under [[configuredLocalDirs]].
getLocalDir
is used when:
SparkEnv
utility is used to create a base SparkEnv for the driverUtils
utility is used to fetchFileDriverLogger
is createdRocksDBStateStoreProvider
(Spark Structured Streaming) is requested for aRocksDB
PythonBroadcast
(PySpark) is requested toreadObject
AggregateInPandasExec
(PySpark) is requested todoExecute
EvalPythonExec
(PySpark) is requested todoExecute
WindowInPandasExec
(PySpark) is requested todoExecute
PythonForeachWriter
(PySpark) is requested for aUnsafeRowBuffer
Client
(Spark on YARN) is requested toprepareLocalResources
andcreateConfArchive
localRootDirs Registry¶
Utils
utility uses localRootDirs
internal registry so getOrCreateLocalRootDirsImpl is executed just once (when first requested).
localRootDirs
is available using getOrCreateLocalRootDirs
method.
getOrCreateLocalRootDirs(
conf: SparkConf): Array[String]
getOrCreateLocalRootDirs
is used when:
Utils
is used to getLocalDirWorker
(Spark Standalone) is requested to launch an executor
Creating spark Directory in Every Local Root Directory¶
getOrCreateLocalRootDirsImpl(
conf: SparkConf): Array[String]
getOrCreateLocalRootDirsImpl
creates a spark-[randomUUID]
directory under every root directory for local storage (and registers a shutdown hook to delete the directories at shutdown).
getOrCreateLocalRootDirsImpl
prints out the following WARN message to the logs when there is a local root directories as a URI (with a scheme):
The configured local directories are not expected to be URIs;
however, got suspicious values [[uris]].
Please check your configured local directories.
Local URI Scheme¶
Utils
defines a local
URI scheme for files that are locally available on worker nodes in the cluster.
The local
URL scheme is used when:
Utils
is used to isLocalUriClient
(Spark on YARN) is used
isLocalUri¶
isLocalUri(
uri: String): Boolean
isLocalUri
is true
when the URI is a local:
URI (the given uri
starts with local: scheme).
isLocalUri
is used when:
- FIXME
getCurrentUserName¶
getCurrentUserName(): String
getCurrentUserName
computes the user name who has started the SparkContext.md[SparkContext] instance.
NOTE: It is later available as SparkContext.md#sparkUser[SparkContext.sparkUser].
Internally, it reads SparkContext.md#SPARK_USER[SPARK_USER] environment variable and, if not set, reverts to Hadoop Security API's UserGroupInformation.getCurrentUser().getShortUserName()
.
NOTE: It is another place where Spark relies on Hadoop API for its operation.
localHostName¶
localHostName(): String
localHostName
computes the local host name.
It starts by checking SPARK_LOCAL_HOSTNAME
environment variable for the value. If it is not defined, it uses SPARK_LOCAL_IP
to find the name (using InetAddress.getByName
). If it is not defined either, it calls InetAddress.getLocalHost
for the name.
NOTE: Utils.localHostName
is executed while SparkContext.md#creating-instance[SparkContext
is created] and also to compute the default value of spark-driver.md#spark_driver_host[spark.driver.host Spark property].
getUserJars¶
getUserJars(
conf: SparkConf): Seq[String]
getUserJars
is the spark.jars configuration property with non-empty entries.
getUserJars
is used when:
SparkContext
is created
extractHostPortFromSparkUrl¶
extractHostPortFromSparkUrl(
sparkUrl: String): (String, Int)
extractHostPortFromSparkUrl
creates a Java URI with the input sparkUrl
and takes the host and port parts.
extractHostPortFromSparkUrl
asserts that the input sparkURL
uses spark scheme.
extractHostPortFromSparkUrl
throws a SparkException
for unparseable spark URLs:
Invalid master URL: [sparkUrl]
extractHostPortFromSparkUrl
is used when:
StandaloneSubmitRequestServlet
is requested tobuildDriverDescription
RpcAddress
is requested to extract an RpcAddress from a Spark master URL
isDynamicAllocationEnabled¶
isDynamicAllocationEnabled(
conf: SparkConf): Boolean
isDynamicAllocationEnabled
checks whether Dynamic Allocation of Executors is enabled (true
) or not (false
).
isDynamicAllocationEnabled
is positive (true
) when all the following hold:
- spark.dynamicAllocation.enabled configuration property is
true
- spark.master is non-
local
isDynamicAllocationEnabled
is used when:
SparkContext
is created (to start an ExecutorAllocationManager)TaskResourceProfile
is requested for custom executor resourcesResourceProfileManager
is createdDAGScheduler
is requested to checkBarrierStageWithDynamicAllocationTaskSchedulerImpl
is requested to resourceOffersSchedulerBackendUtils
is requested to getInitialTargetExecutorNumberStandaloneSchedulerBackend
(Spark Standalone) is requested tostart
(for reporting purposes)ExecutorPodsAllocator
(Spark on Kubernetes) is created (maxPVCs
)ApplicationMaster
(Spark on YARN) is created (maxNumExecutorFailures
)YarnSchedulerBackend
(Spark on YARN) is requested togetShufflePushMergerLocations
checkAndGetK8sMasterUrl¶
checkAndGetK8sMasterUrl(
rawMasterURL: String): String
checkAndGetK8sMasterUrl
...FIXME
checkAndGetK8sMasterUrl
is used when:
SparkSubmit
is requested to prepareSubmitEnvironment (for Kubernetes cluster manager)
Fetching File¶
fetchFile(
url: String,
targetDir: File,
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration,
timestamp: Long,
useCache: Boolean): File
fetchFile
...FIXME
fetchFile
is used when:
-
SparkContext
is requested to SparkContext.md#addFile[addFile] -
Executor
is requested to executor:Executor.md#updateDependencies[updateDependencies] -
Spark Standalone's
DriverRunner
is requested todownloadUserJar
isPushBasedShuffleEnabled¶
isPushBasedShuffleEnabled(
conf: SparkConf,
isDriver: Boolean,
checkSerializer: Boolean = true): Boolean
isPushBasedShuffleEnabled
takes the value of spark.shuffle.push.enabled configuration property (from the given SparkConf).
If false
, isPushBasedShuffleEnabled
does nothing and returns false
as well.
Otherwise, isPushBasedShuffleEnabled
returns whether it is even possible to use push-based shuffle or not based on the following:
- External Shuffle Service is used (based on spark.shuffle.service.enabled that should be
true
) - spark.master is
yarn
- (only with
checkSerializer
enabled) spark.serializer is a Serializer that supportsRelocationOfSerializedObjects - spark.io.encryption.enabled is
false
In case spark.shuffle.push.enabled configuration property is enabled but the above requirements did not hold, isPushBasedShuffleEnabled
prints out the following WARN message to the logs:
Push-based shuffle can only be enabled
when the application is submitted to run in YARN mode,
with external shuffle service enabled, IO encryption disabled,
and relocation of serialized objects supported.
isPushBasedShuffleEnabled
is used when:
ShuffleDependency
is requested to canShuffleMergeBeEnabledMapOutputTrackerMaster
is createdMapOutputTrackerWorker
is createdDAGScheduler
is createdShuffleBlockPusher
utility is used to create aBLOCK_PUSHER_POOL
thread poolBlockManager
is requested to initialize and registerWithExternalShuffleServerBlockManagerMasterEndpoint
is createdDiskBlockManager
is requested to createLocalDirsForMergedShuffleBlocks
Logging¶
Enable ALL
logging level for org.apache.spark.util.Utils
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.util.Utils=ALL
Refer to Logging.