Master¶
Master
is the manager of a Spark Standalone cluster.
Master
can be launched from command line.
StandaloneRestServer¶
Master
can start StandaloneRestServer when enabled using spark.master.rest.enabled configuration property.
StandaloneRestServer
is requested to start in onStart and stop in onStop
Master RPC Endpoint¶
Master
is a ThreadSafeRpcEndpoint
and is registered under Master name (when launched as a command-line application and requested to start up an RPC environment).
Launching Standalone Master¶
Master
can be launched as a standalone application using spark-class
.
./bin/spark-class org.apache.spark.deploy.master.Master
main Entry Point¶
main(
argStrings: Array[String]): Unit
main
is the entry point of the Master
standalone application.
main
prints out the following INFO message to the logs:
Started daemon with process name: [processName]
main
registers signal handlers for TERM
, HUP
, INT
signals.
main
parses command-line options (using MasterArguments
) and initializes an RpcEnv.
In the end, main
requests the RpcEnv
to be notified when terminated
.
Command-Line Options¶
Master
supports command-line options.
Usage: Master [options]
Options:
-i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h)
-h HOST, --host HOST Hostname to listen on
-p PORT, --port PORT Port to listen on (default: 7077)
--webui-port PORT Port for web UI (default: 8080)
--properties-file FILE Path to a custom Spark properties file.
Default is conf/spark-defaults.conf.
host¶
ip¶
port¶
properties-file¶
webui-port¶
Creating Instance¶
Master
takes the following to be created:
-
RpcEnv
-
RpcAddress
- web UI's Port
-
SecurityManager
-
SparkConf
Master
is created when:
Master
utility is requested to start up RPC environment
Starting Up RPC Environment¶
startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int])
startRpcEnvAndEndpoint
creates a RpcEnv
with sparkMaster name (and the input arguments) and registers Master endpoint with Master name.
In the end, startRpcEnvAndEndpoint
sends BoundPortsResponse
message (synchronously) to the Master endpoint and returns the RpcEnv
with the ports of the web UI and the REST Server.
startRpcEnvAndEndpoint
is used when:
spreadOutApps¶
Master
uses spark.deploy.spreadOut configuration property when requested to startExecutorsOnWorkers.
Scheduling Resources Among Waiting Applications¶
schedule(): Unit
schedule
...FIXME
schedule
is used when:
Master
is requested to schedule resources among waiting applications
startExecutorsOnWorkers¶
startExecutorsOnWorkers(): Unit
startExecutorsOnWorkers
...FIXME
WebUI¶
MasterWebUI
is the Web UI server for the standalone master. Master starts Web UI to listen to http://[master's hostname]:webUIPort
(e.g. http://localhost:8080
).
Successfully started service 'MasterUI' on port 8080.
Started MasterWebUI at http://192.168.1.4:8080
States¶
Master can be in the following states:
STANDBY
- the initial state whileMaster
is initializingALIVE
- start scheduling resources among applicationsRECOVERING
COMPLETING_RECOVERY
LeaderElectable¶
Master
is LeaderElectable
.
To be Reviewed¶
Application ids follows the pattern app-yyyyMMddHHmmss
.
Master
can be <
REST Server¶
The standalone Master starts the REST Server service for alternative application submission that is supposed to work across Spark versions. It is enabled by default (see <--deploy-mode
is cluster
.
RestSubmissionClient
is the client.
The server includes a JSON representation of SubmitRestProtocolResponse
in the HTTP body.
The following INFOs show up when the Master Endpoint starts up (Master#onStart
is called) with REST Server enabled:
INFO Utils: Successfully started service on port 6066.
INFO StandaloneRestServer: Started REST server for submitting applications on port 6066
Recovery Mode¶
A standalone Master can run with recovery mode enabled and be able to recover state among the available swarm of masters. By default, there is no recovery, i.e. no persistence and no election.
NOTE: Only a master can schedule tasks so having one always on is important for cases where you want to launch new tasks. Running tasks are unaffected by the state of the master.
Master uses spark.deploy.recoveryMode
to set up the recovery mode (see <
The Recovery Mode enables <
TIP: Check out the exercise link:exercises/spark-exercise-standalone-master-ha.md[Spark Standalone - Using ZooKeeper for High-Availability of Master].
RPC Messages¶
Master communicates with drivers, executors and configures itself using RPC messages.
The following message types are accepted by master (see Master#receive
or Master#receiveAndReply
methods):
ElectedLeader
for <> CompleteRecovery
RevokedLeadership
- <
> ExecutorStateChanged
DriverStateChanged
Heartbeat
MasterChangeAcknowledged
WorkerSchedulerStateResponse
UnregisterApplication
CheckForWorkerTimeOut
RegisterWorker
RequestSubmitDriver
RequestKillDriver
RequestDriverStatus
RequestMasterState
BoundPortsRequest
RequestExecutors
KillExecutors
RegisterApplication event¶
A RegisterApplication event is sent by link:spark-standalone.md#AppClient[AppClient] to the standalone Master. The event holds information about the application being deployed (ApplicationDescription
) and the driver's endpoint reference.
ApplicationDescription
describes an application by its name, maximum number of cores, executor's memory, command, appUiUrl, and user with optional eventLogDir and eventLogCodec for Event Logs, and the number of cores per executor.
CAUTION: FIXME Finish
A standalone Master receives RegisterApplication
with a ApplicationDescription
and the driver's xref:rpc:RpcEndpointRef.md[RpcEndpointRef].
INFO Registering app " + description.name
Application ids in Spark Standalone are in the format of app-[yyyyMMddHHmmss]-[4-digit nextAppNumber]
.
Master keeps track of the number of already-scheduled applications (nextAppNumber
).
ApplicationDescription (AppClient) → ApplicationInfo (Master) - application structure enrichment
ApplicationSource
metrics + applicationMetricsSystem
INFO Registered app " + description.name + " with ID " + app.id
CAUTION: FIXME persistenceEngine.addApplication(app)
schedule()
schedules the currently available resources among waiting apps.
FIXME When is schedule()
method called?
It's only executed when the Master is in RecoveryState.ALIVE
state.
Worker in WorkerState.ALIVE
state can accept applications.
A driver has a state, i.e. driver.state
and when it's in DriverState.RUNNING
state the driver has been assigned to a worker for execution.
LaunchDriver RPC message¶
WARNING: It seems a dead message. Disregard it for now.
A LaunchDriver message is sent by an active standalone Master to a worker to launch a driver.
.Master finds a place for a driver (posts LaunchDriver) image::spark-standalone-master-worker-LaunchDriver.png[align="center"]
You should see the following INFO in the logs right before the message is sent out to a worker:
INFO Launching driver [driver.id] on worker [worker.id]
The message holds information about the id and name of the driver.
A driver can be running on a single worker while a worker can have many drivers running.
When a worker receives a LaunchDriver
message, it prints out the following INFO:
Asked to launch driver [driver.id]
It then creates a DriverRunner
and starts it. It starts a separate JVM process.
Workers' free memory and cores are considered when assigning some to waiting drivers (applications).
CAUTION: FIXME Go over waitingDrivers
...
Internals of org.apache.spark.deploy.master.Master¶
When Master
starts, it first creates the default SparkConf
configuration whose values it then overrides using <
A fully-configured master instance requires host
, port
(default: 7077
), webUiPort
(default: 8080
) settings defined.
TIP: When in troubles, consult link:spark-tips-and-tricks.md[Spark Tips and Tricks] document.
It starts <
Worker Management¶
Master uses master-forward-message-thread
to schedule a thread every spark.worker.timeout
to check workers' availability and remove timed-out workers.
It is that Master sends CheckForWorkerTimeOut
message to itself to trigger verification.
When a worker hasn't responded for spark.worker.timeout
, it is assumed dead and the following WARN message appears in the logs:
WARN Removing [worker.id] because we got no heartbeat in [spark.worker.timeout] seconds
System Environment Variables¶
Master uses the following system environment variables (directly or indirectly):
SPARK_LOCAL_HOSTNAME
- the custom host nameSPARK_LOCAL_IP
- the custom IP to use whenSPARK_LOCAL_HOSTNAME
is not setSPARK_MASTER_HOST
(notSPARK_MASTER_IP
as used instart-master.sh
script above!) - the master custom hostSPARK_MASTER_PORT
(default:7077
) - the master custom portSPARK_MASTER_IP
(default:hostname
command's output)SPARK_MASTER_WEBUI_PORT
(default:8080
) - the port of the master's WebUI. Overriden byspark.master.ui.port
if set in the properties file.SPARK_PUBLIC_DNS
(default: hostname) - the custom master hostname for WebUI's http URL and master's address.SPARK_CONF_DIR
(default:$SPARK_HOME/conf
) - the directory of the default properties file link:spark-properties.md#spark-defaults-conf[spark-defaults.conf] from which all properties that start withspark.
prefix are loaded.
Settings¶
Master uses the following properties:
spark.cores.max
(default:0
) - total expected number of cores. When set, an application could get executors of different sizes (in terms of cores).spark.dead.worker.persistence
(default:15
)spark.deploy.retainedApplications
(default:200
)spark.deploy.retainedDrivers
(default:200
)spark.deploy.recoveryMode
(default:NONE
) - possible modes:ZOOKEEPER
,FILESYSTEM
, orCUSTOM
. Refer to <>. spark.deploy.recoveryMode.factory
- the class name of the customStandaloneRecoveryModeFactory
.spark.deploy.recoveryDirectory
(default: empty) - the directory to persist recovery state- link:spark-standalone.md#spark.deploy.spreadOut[spark.deploy.spreadOut] to perform link:spark-standalone.md#round-robin-scheduling[round-robin scheduling across the nodes].
spark.deploy.defaultCores
(default:Int.MaxValue
, i.e. unbounded) - the number of maxCores for applications that don't specify it.spark.worker.timeout
(default:60
) - time (in seconds) when no heartbeat from a worker means it is lost. See <>.