ApplicationMaster (aka ExecutorLauncher)

ApplicationMaster is the YARN ApplicationMaster for a Spark application submitted to a YARN cluster (which is commonly called Spark on YARN).

spark yarn ApplicationMaster
Figure 1. ApplicationMaster’s Dependencies

ApplicationMaster is a standalone application that YARN NodeManager runs in a YARN container to manage a Spark application running in a YARN cluster.

From the official documentation of Apache Hadoop YARN (with some minor changes of mine):

The per-application ApplicationMaster is actually a framework-specific library and is tasked with negotiating cluster resources from the YARN ResourceManager and working with the YARN NodeManager(s) to execute and monitor the tasks.

ApplicationMaster (and ExecutorLauncher) is launched as a result of Client creating a ContainerLaunchContext to launch a Spark application on YARN.

spark yarn ApplicationMaster main
Figure 2. Launching ApplicationMaster
ContainerLaunchContext represents all of the information needed by the YARN NodeManager to launch a container.

ExecutorLauncher is a custom ApplicationMaster for client deploy mode only for the purpose of easily distinguishing client and cluster deploy modes when using ps or jps.

$ jps -lm

71253 org.apache.spark.deploy.yarn.ExecutorLauncher --arg 192.168.99.1:50188 --properties-file /tmp/hadoop-jacek/nm-local-dir/usercache/jacek/appcache/.../__spark_conf__/__spark_conf__.properties

When created ApplicationMaster takes a YarnRMClient (to handle communication with YARN ResourceManager for YARN containers for ApplicationMaster and executors).

ApplicationMaster uses YarnAllocator to manage YARN containers with executors.

Table 1. ApplicationMaster’s Internal Properties
Name Initial Value Description

amEndpoint

(uninitialized)

RpcEndpointRef to the YarnAM RPC endpoint initialized when ApplicationMaster runAMEndpoint.

CAUTION: FIXME When, in a Spark application’s lifecycle, does runAMEndpoint really happen?

Used exclusively when ApplicationMaster registers the web UI security filters (in client deploy mode when the driver runs outside ApplicationMaster).

sparkConf

New SparkConf

FIXME

finished

false

Flag to…​FIXME

yarnConf

Hadoop’s YarnConfiguration

Flag to…​FIXME

Created using SparkHadoopUtil.newConfiguration

exitCode

0

FIXME

userClassThread

(uninitialized)

FIXME

sparkContextPromise

SparkContext Scala’s Promise

Used only in cluster deploy mode (when the driver and ApplicationMaster run together in a YARN container) as a communication bus between ApplicationMaster and the separate Driver thread that runs a Spark application.

Used to inform ApplicationMaster when a Spark application’s SparkContext has been initialized successfully or failed.

Non-null value allows ApplicationMaster to access the driver’s RpcEnv (available as rpcEnv).

NOTE: A successful initialization of a Spark application’s SparkContext is when YARN-specific TaskScheduler, i.e. YarnClusterScheduler, gets informed that the Spark application has started. What a clever solution!

rpcEnv

(uninitialized)

RpcEnv which is:

isClusterMode

true (when --class was specified)

Flag…​FIXME

maxNumExecutorFailures

FIXME

maxNumExecutorFailures Property

FIXME

Computed using the optional spark.yarn.max.executor.failures if set. Otherwise, it is twice spark.executor.instances or spark.dynamicAllocation.maxExecutors (with dynamic allocation enabled) with the minimum of 3.

Creating ApplicationMaster Instance

ApplicationMaster takes the following when created:

ApplicationMaster initializes the internal registries and counters.

FIXME Review the initialization again

reporterThread Method

FIXME

Launching Progress Reporter Thread — launchReporterThread Method

FIXME

Setting Internal SparkContext Reference — sparkContextInitialized Method

sparkContextInitialized(sc: SparkContext): Unit

sparkContextInitialized passes the call on to the ApplicationMaster.sparkContextInitialized that sets the internal sparkContextRef reference (to be sc).

Clearing Internal SparkContext Reference — sparkContextStopped Method

sparkContextStopped(sc: SparkContext): Boolean

sparkContextStopped passes the call on to the ApplicationMaster.sparkContextStopped that clears the internal sparkContextRef reference (i.e. sets it to null).

Registering web UI Security Filters — addAmIpFilter Method

addAmIpFilter(): Unit

addAmIpFilter is a helper method that …​???

In cluster deploy mode (when ApplicationMaster runs with web UI), it sets spark.ui.filters system property as org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter. It also sets system properties from the key-value configuration of AmIpFilter (computed earlier) as spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.[key] being [value].

In client deploy mode (when ApplicationMaster runs on another JVM or even host than web UI), it simply sends a AddWebUIFilter to ApplicationMaster (namely to AMEndpoint RPC Endpoint).

finish Method

FIXME

allocator Internal Reference to YarnAllocator

allocator is the internal reference to YarnAllocator that ApplicationMaster uses to request new or release outstanding containers for executors.

allocator is created when ApplicationMaster is registered (using the internal YarnRMClient reference).

Launching ApplicationMaster Standalone Application — main Method

ApplicationMaster is started as a standalone application inside a YARN container on a node.

ApplicationMaster standalone application is launched as a result of sending a ContainerLaunchContext request to launch ApplicationMaster for a Spark application to YARN ResourceManager.
spark yarn ApplicationMaster client submitApplication
Figure 3. Submitting ApplicationMaster to YARN NodeManager

When executed, main first parses command-line parameters and then uses SparkHadoopUtil.runAsSparkUser to run the main code with a Hadoop UserGroupInformation as a thread local variable (distributed to child threads) for authenticating HDFS and YARN calls.

Enable DEBUG logging level for org.apache.spark.deploy.SparkHadoopUtil logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.deploy.SparkHadoopUtil=DEBUG

Refer to Logging.

You should see the following message in the logs:

DEBUG running as user: [user]

SparkHadoopUtil.runAsSparkUser function executes a block that creates a ApplicationMaster (passing the ApplicationMasterArguments instance and a new YarnRMClient) and then runs it.

Running ApplicationMaster — run Method

run(): Int

run reads the application attempt id.

(only in cluster deploy mode) run sets cluster deploy mode-specific settings and sets the application attempt id (from YARN).

run sets a CallerContext for APPMASTER.

FIXME Why is CallerContext required? It’s only executed when hadoop.caller.context.enabled is enabled and org.apache.hadoop.ipc.CallerContext class is on CLASSPATH.

You should see the following INFO message in the logs:

INFO ApplicationAttemptId: [appAttemptId]

run creates a Hadoop FileSystem (using the internal YarnConfiguration).

run registers the cleanup shutdown hook.

run creates a SecurityManager.

(only when spark.yarn.credentials.file is defined) run creates a ConfigurableCredentialManager to get a AMCredentialRenewer and schedules login from keytab.

FIXME Security stuff begs for more details.

In the end, run registers ApplicationMaster (with YARN ResourceManager) for the Spark application — either calling runDriver (in cluster deploy mode) or runExecutorLauncher (for client deploy mode).

run exits with 0 exit code.

In case of an exception, you should see the following ERROR message in the logs and run finishes with FAILED final application status.

ERROR Uncaught exception: [exception]
run is used exclusively when ApplicationMaster is launched as a standalone application (inside a YARN container on a YARN cluster).

Creating sparkYarnAM RPC Environment and Registering ApplicationMaster with YARN ResourceManager (Client Deploy Mode)

runExecutorLauncher(
  securityMgr: SecurityManager): Unit

runExecutorLauncher creates sparkYarnAM RPC environment (on spark.yarn.am.port port, the internal SparkConf and clientMode enabled).

Read the note in Creating RpcEnv to learn the meaning of clientMode input argument.

clientMode is enabled for so-called a client-mode ApplicationMaster which is when a Spark application is submitted to YARN in client deploy mode.

runExecutorLauncher registers web UI security filters.

FIXME Why is this needed? addAmIpFilter

In the end, runExecutorLauncher registers ApplicationMaster with YARN ResourceManager and requests resources and then pauses until reporterThread finishes.

runExecutorLauncher is used exclusively when ApplicationMaster is started in client deploy mode.

Running Spark Application’s Driver and Registering ApplicationMaster with YARN ResourceManager (Cluster Deploy Mode)

runDriver(
  securityMgr: SecurityManager): Unit

runDriver starts a Spark application on a separate thread, registers YarnAM endpoint in the application’s RpcEnv followed by registering ApplicationMaster with YARN ResourceManager. In the end, runDriver waits for the Spark application to finish.

You should see the following INFO message in the logs:

Waiting for spark context initialization...

runDriver waits spark.yarn.am.waitTime time till the Spark application’s SparkContext is available and accesses the current RpcEnv (and saves it as the internal rpcEnv).

runDriver creates RpcEndpointRef to the driver’s YarnScheduler endpoint and registers YarnAM endpoint (using spark.driver.host and spark.driver.port properties for the driver’s host and port and isClusterMode enabled).

runDriver registers ApplicationMaster with YARN ResourceManager and requests cluster resources (using the Spark application’s RpcEnv, the driver’s RPC endpoint reference, webUrl if web UI is enabled and the input securityMgr).

runDriver pauses until the Spark application finishes.

runDriver uses Java’s Thread.join on the internal Thread reference to the Spark application running on it.

If the Spark application has not started in spark.yarn.am.waitTime time, runDriver reports a IllegalStateException:

SparkContext is null but app is still running!

If TimeoutException is reported while waiting for the Spark application to start, you should see the following ERROR message in the logs and runDriver finishes with FAILED final application status and the error code 13.

SparkContext did not initialize after waiting for [spark.yarn.am.waitTime] ms. Please check earlier log output for errors. Failing the application.

runDriver is used when ApplicationMaster is started (in cluster deploy mode).

Starting Spark Application (in Separate Driver Thread) — startUserApplication Method

startUserApplication(): Thread

startUserApplication starts a Spark application as a separate Driver thread.

Internally, when startUserApplication is executed, you should see the following INFO message in the logs:

INFO Starting the user application in a separate Thread

startUserApplication takes the user-specified jars and maps them to use the file: protocol.

startUserApplication then creates a class loader to load the main class of the Spark application given the precedence of the Spark system jars and the user-specified jars.

startUserApplication works on custom configurations for Python and R applications (which I don’t bother including here).

startUserApplication loads the main class (using the custom class loader created above with the user-specified jars) and creates a reference to the main method.

The main class is specified as userClass in ApplicationMasterArguments when ApplicationMaster was created.

startUserApplication starts a Java Thread (with the name Driver) that invokes the main method (with the application arguments from userArgs from ApplicationMasterArguments). The Driver thread uses the internal sparkContextPromise to notify ApplicationMaster about the execution status of the main method (success or failure).

When the main method (of the Spark application) finishes successfully, the Driver thread will finish with SUCCEEDED final application status and code status 0 and you should see the following DEBUG message in the logs:

DEBUG Done running users class

Any exceptions in the Driver thread are reported with corresponding ERROR message in the logs, FAILED final application status, appropriate code status.

// SparkUserAppException
ERROR User application exited with status [exitCode]

// non-SparkUserAppException
ERROR User class threw exception: [cause]
A Spark application’s exit codes are passed directly to finish ApplicationMaster and recorded as exitCode for future reference.
startUserApplication is used exclusively when ApplicationMaster runs a Spark application’s driver and registers itself with YARN ResourceManager for cluster deploy mode.

Registering ApplicationMaster with YARN ResourceManager and Requesting YARN Cluster Resources — registerAM Internal Method

registerAM(
  _sparkConf: SparkConf,
  _rpcEnv: RpcEnv,
  driverRef: RpcEndpointRef,
  uiAddress: String,
  securityMgr: SecurityManager): Unit
spark yarn ApplicationMaster registerAM
Figure 4. Registering ApplicationMaster with YARN ResourceManager

Internally, registerAM first takes the application and attempt ids, and creates the URL of Spark History Server for the Spark application, i.e. [address]/history/[appId]/[attemptId], by substituting Hadoop variables (using the internal YarnConfiguration) in the optional spark.yarn.historyServer.address setting.

registerAM then creates a RpcEndpointAddress for the driver’s CoarseGrainedScheduler RPC endpoint available at spark.driver.host and spark.driver.port.

registerAM requests YarnRMClient to register ApplicationMaster (with YARN ResourceManager) and the internal YarnAllocator to allocate required cluster resources (given placement hints about where to allocate resource containers for executors to be as close to the data as possible).

registerAM uses YarnRMClient that was given when ApplicationManager was created.

In the end, registerAM launches reporter thread.

registerAM is used when ApplicationMaster runs a Spark application in cluster deploy mode and client deploy mode.

Command-Line Parameters — ApplicationMasterArguments class

ApplicationMaster uses ApplicationMasterArguments class to handle command-line parameters.

ApplicationMasterArguments is created right after main method has been executed for args command-line parameters.

It accepts the following command-line parameters:

  • --jar JAR_PATH — the path to the Spark application’s JAR file

  • --class CLASS_NAME — the name of the Spark application’s main class

  • --arg ARG — an argument to be passed to the Spark application’s main class. There can be multiple --arg arguments that are passed in order.

  • --properties-file FILE — the path to a custom Spark properties file.

  • --primary-py-file FILE — the main Python file to run.

  • --primary-r-file FILE — the main R file to run.

When an unsupported parameter is found the following message is printed out to standard error output and ApplicationMaster exits with the exit code 1.

Unknown/unsupported param [unknownParam]

Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
Options:
  --jar JAR_PATH       Path to your application's JAR file
  --class CLASS_NAME   Name of your application's main class
  --primary-py-file    A main Python file
  --primary-r-file     A main R file
  --arg ARG            Argument to be passed to your application's main class.
                       Multiple invocations are possible, each will be passed in order.
  --properties-file FILE Path to a custom Spark properties file.

localResources Property

When ApplicationMaster is instantiated, it computes internal localResources collection of YARN’s LocalResource by name based on the internal spark.yarn.cache.* configuration settings.

localResources: Map[String, LocalResource]

You should see the following INFO message in the logs:

INFO ApplicationMaster: Preparing Local resources

It starts by reading the internal Spark configuration settings (that were earlier set when Client prepared local resources to distribute):

For each file name in spark.yarn.cache.filenames it maps spark.yarn.cache.types to an appropriate YARN’s LocalResourceType and creates a new YARN LocalResource.

LocalResource represents a local resource required to run a container.

If spark.yarn.cache.confArchive is set, it is added to localResources as ARCHIVE resource type and PRIVATE visibility.

ARCHIVE is an archive file that is automatically unarchived by the NodeManager.
PRIVATE visibility means to share a resource among all applications of the same user on the node.

Ultimately, it removes the cache-related settings from the Spark configuration and system properties.

You should see the following INFO message in the logs:

INFO ApplicationMaster: Prepared Local resources [resources]

Cluster Mode Settings

When in cluster deploy mode, ApplicationMaster sets the following system properties (in run):

FIXME Why are the system properties required? Who’s expecting them?

isClusterMode Internal Flag

FIXME Since org.apache.spark.deploy.yarn.ExecutorLauncher is used for client deploy mode, the isClusterMode flag could be set there (not depending on --class which is correct yet not very obvious).

isClusterMode is an internal flag that is enabled (i.e. true) for cluster mode.

Specifically, it says whether the main class of the Spark application (through --class command-line argument) was specified or not. That is how the developers decided to inform ApplicationMaster about being run in cluster mode when Client creates YARN’s ContainerLaunchContext (to launch the ApplicationMaster for a Spark application).

isClusterMode is used to set additional system properties in run and runDriver (the flag is enabled) or runExecutorLauncher (when disabled).

Besides, isClusterMode controls the default final status of a Spark application being FinalApplicationStatus.FAILED (when the flag is enabled) or FinalApplicationStatus.UNDEFINED.

isClusterMode also controls whether to set system properties in addAmIpFilter (when the flag is enabled) or send a AddWebUIFilter instead.

Unregistering ApplicationMaster from YARN ResourceManager — unregister Method

unregister unregisters the ApplicationMaster for the Spark application from the YARN ResourceManager.

unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit
It is called from the cleanup shutdown hook (that was registered in ApplicationMaster when it started running) and only when the application’s final result is successful or it was the last attempt to run the application.

It first checks that the ApplicationMaster has not already been unregistered (using the internal unregistered flag). If so, you should see the following INFO message in the logs:

INFO ApplicationMaster: Unregistering ApplicationMaster with [status]

There can also be an optional diagnostic message in the logs:

(diag message: [msg])

The internal unregistered flag is set to be enabled, i.e. true.

It then requests YarnRMClient to unregister.

Cleanup Shutdown Hook

Internally, it checks the internal finished flag, and if it is disabled, it marks the Spark application as failed with EXIT_EARLY.

If the internal unregistered flag is disabled, it unregisters the Spark application and cleans up the staging directory afterwards only when the final status of the ApplicationMaster’s registration is FinalApplicationStatus.SUCCEEDED or the number of application attempts is more than allowed.

The shutdown hook runs after the SparkContext is shut down, i.e. the shutdown priority is one less than SparkContext’s.

The shutdown hook is registered using Spark’s own ShutdownHookManager.addShutdownHook.

ExecutorLauncher

ExecutorLauncher comes with no extra functionality when compared to ApplicationMaster. It serves as a helper class to run ApplicationMaster under another class name in client deploy mode.

With the two different class names (pointing at the same class ApplicationMaster) you should be more successful to distinguish between ExecutorLauncher (which is really a ApplicationMaster) in client deploy mode and the ApplicationMaster in cluster deploy mode using tools like ps or jps.

Consider ExecutorLauncher a ApplicationMaster for client deploy mode.

Obtain Application Attempt Id — getAttemptId Method

getAttemptId(): ApplicationAttemptId

getAttemptId returns YARN’s ApplicationAttemptId (of the Spark application to which the container was assigned).

Internally, it queries YARN by means of YarnRMClient.

Waiting Until Driver is Network-Accessible and Creating RpcEndpointRef to Communicate — waitForSparkDriver Internal Method

waitForSparkDriver(): RpcEndpointRef

waitForSparkDriver waits until the driver is network-accessible, i.e. accepts connections on a given host and port, and returns a RpcEndpointRef to the driver.

When executed, you should see the following INFO message in the logs:

INFO yarn.ApplicationMaster: Waiting for Spark driver to be reachable.

waitForSparkDriver takes the driver’s host and port (using ApplicationMasterArguments passed in when ApplicationMaster was created).

FIXME waitForSparkDriver expects the driver’s host and port as the 0-th element in ApplicationMasterArguments.userArgs. Why?

waitForSparkDriver tries to connect to the driver’s host and port until the driver accepts the connection but no longer than spark.yarn.am.waitTime setting or finished internal flag is enabled.

You should see the following INFO message in the logs:

INFO yarn.ApplicationMaster: Driver now available: [driverHost]:[driverPort]

While waitForSparkDriver tries to connect (while the socket is down), you can see the following ERROR message and waitForSparkDriver pauses for 100 ms and tries to connect again (until the waitTime elapses).

ERROR Failed to connect to driver at [driverHost]:[driverPort], retrying ...

Once waitForSparkDriver could connect to the driver, waitForSparkDriver sets spark.driver.host and spark.driver.port properties to driverHost and driverPort, respectively (using the internal SparkConf).

In the end, waitForSparkDriver runAMEndpoint.

If waitForSparkDriver did not manage to connect (before waitTime elapses or finished internal flag was enabled), waitForSparkDriver reports a SparkException:

Failed to connect to driver!
waitForSparkDriver is used exclusively when client-mode ApplicationMaster creates the sparkYarnAM RPC environment and registers itself with YARN ResourceManager.

Creating RpcEndpointRef to Driver’s YarnScheduler Endpoint and Registering YarnAM Endpoint — runAMEndpoint Internal Method

runAMEndpoint(host: String, port: String, isClusterMode: Boolean): RpcEndpointRef

runAMEndpoint sets up a RpcEndpointRef to the driver’s YarnScheduler endpoint and registers YarnAM endpoint.

sparkDriver RPC environment when the driver lives in YARN cluster (in cluster deploy mode)
spark yarn ApplicationMaster runAMEndpoint
Figure 5. Registering YarnAM Endpoint

Internally, runAMEndpoint gets a RpcEndpointRef to the driver’s YarnScheduler endpoint (available on the host and port).

YarnScheduler RPC endpoint is registered when the Spark coarse-grained scheduler backends for YARN are created.

runAMEndpoint then registers the RPC endpoint as YarnAM (and AMEndpoint implementation with ApplicationMaster's RpcEnv, YarnScheduler endpoint reference, and isClusterMode flag).

runAMEndpoint is used when ApplicationMaster waits for the driver (in client deploy mode) and runs the driver (in cluster deploy mode).

createAllocator Method

createAllocator(
  driverRef: RpcEndpointRef,
  _sparkConf: SparkConf): Unit

createAllocator…​FIXME

createAllocator is used when…​FIXME