Worker¶
Worker is a logical worker node in a Spark Standalone cluster.
Worker can be launched from command line.
Worker RPC Endpoint¶
Worker is a ThreadSafeRpcEndpoint and is registered under Worker name (when launched as a command-line application and requested to set up an RPC environment).
Launching Standalone Worker¶
Worker can be launched as a standalone application using spark-class.
./bin/spark-class org.apache.spark.deploy.worker.Worker
Note
At least one master URL is required.
main Entry Point¶
main(
args: Array[String]): Unit
main is the entry point of Worker standalone application.
main prints out the following INFO message to the logs:
Started daemon with process name: [processName]
main registers signal handlers for TERM, HUP, INT signals.
main parses command-line options (using WorkerArguments) and initializes an RpcEnv.
main asserts that:
- External Shuffle Service is not used (based on
spark.shuffle.service.enabledconfiguration property) - Number of worker instances is
1(based onSPARK_WORKER_INSTANCESenvironment variable)
main throws an IllegalArgumentException when the above does not hold:
Starting multiple workers on one host is failed because we may launch no more than one external shuffle service on each host, please set spark.shuffle.service.enabled to false or set SPARK_WORKER_INSTANCES to 1 to resolve the conflict.
In the end, main requests the RpcEnv to be notified when terminated.
Command-Line Options¶
Worker supports command-line options.
Usage: Worker [options] <master>
Master must be a URL of the form spark://hostname:port
Options:
-c CORES, --cores CORES Number of cores to use
-m MEM, --memory MEM Amount of memory to use (e.g. 1000M, 2G)
-d DIR, --work-dir DIR Directory to run apps in (default: SPARK_HOME/work)
-i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)
-h HOST, --host HOST Hostname to listen on
-p PORT, --port PORT Port to listen on (default: random)
--webui-port PORT Port for web UI (default: 8081)
--properties-file FILE Path to a custom Spark properties file.
Default is conf/spark-defaults.conf.
cores¶
host¶
ip¶
Master URLs¶
(required) Comma-separated standalone Master's URLs in the form:
spark://host1:port1,host2:port2,...
memory¶
port¶
properties-file¶
webui-port¶
work-dir¶
Creating Instance¶
Worker takes the following to be created:
-
RpcEnv - web UI's Port
- Number of CPU cores
- Memory
-
RpcAddresses of the Masters - Endpoint Name
- Work Dir Path (default:
null) -
SparkConf -
SecurityManager - Optional Resource File (default: (undefined))
- Supplier of
ExternalShuffleService(default:null)
Worker is created when:
Workerutility is requested to startRpcEnvAndEndpoint
ExternalShuffleService¶
Worker initializes an ExternalShuffleService (directly or indirectly using a Supplier if given).
ExternalShuffleService is started when Worker is requested to startExternalShuffleService.
ExternalShuffleService is used as follows:
-
Informed about an application removed when
Workerhandles a WorkDirCleanup message or maybeCleanupApplication -
Informed about an executor removed when
Workeris requested to handleExecutorStateChanged
ExternalShuffleService is stopped when Worker is requested to stop.
Starting Up RPC Environment¶
startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf,
resourceFileOpt: Option[String] = None): RpcEnv
startRpcEnvAndEndpoint creates an RpcEnv (with the name sparkWorker and the given host and port).
startRpcEnvAndEndpoint translates the given masterUrls to RpcAddresses.
startRpcEnvAndEndpoint creates a Worker and requests the RpcEnv to set it up as an RPC endpoint under the Worker name.
startRpcEnvAndEndpoint is used when:
LocalSparkClusteris requested tostartWorkerstandalone application is launched
Creating Work Directory¶
createWorkDir(): Unit
createWorkDir sets <work subdirectory.
In the end, createWorkDir creates <
createWorkDir reports...FIXME
Messages¶
ApplicationFinished¶
DriverStateChanged¶
ExecutorStateChanged¶
ExecutorStateChanged(
appId: String,
execId: Int,
state: ExecutorState,
message: Option[String],
exitStatus: Option[Int])
Message Handler: handleExecutorStateChanged
Posted when:
ExecutorRunneris requested to killProcess and fetchAndRunExecutor
KillDriver¶
KillExecutor¶
LaunchDriver¶
LaunchExecutor¶
MasterChanged¶
ReconnectWorker¶
RegisterWorkerResponse¶
ReregisterWithMaster¶
RequestWorkerState¶
SendHeartbeat¶
WorkDirCleanup¶
handleExecutorStateChanged¶
handleExecutorStateChanged(
executorStateChanged: ExecutorStateChanged): Unit
handleExecutorStateChanged...FIXME
handleExecutorStateChanged is used when:
Workeris requested to handle ExecutorStateChanged message
maybeCleanupApplication¶
maybeCleanupApplication(
id: String): Unit
maybeCleanupApplication...FIXME
maybeCleanupApplication is used when:
Workeris requested to handle a ApplicationFinished message and handleExecutorStateChanged
Logging¶
Enable ALL logging level for org.apache.spark.deploy.worker.Worker logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.deploy.worker.Worker=ALL
Refer to Logging.