Skip to content

Worker

Worker is a logical worker node in a Spark Standalone cluster.

Worker can be launched from command line.

Worker RPC Endpoint

Worker is a ThreadSafeRpcEndpoint and is registered under Worker name (when launched as a command-line application and requested to set up an RPC environment).

Launching Standalone Worker

Worker can be launched as a standalone application using spark-class.

./bin/spark-class org.apache.spark.deploy.worker.Worker

Note

At least one master URL is required.

main Entry Point

main(
  args: Array[String]): Unit

main is the entry point of Worker standalone application.

main prints out the following INFO message to the logs:

Started daemon with process name: [processName]

main registers signal handlers for TERM, HUP, INT signals.

main parses command-line options (using WorkerArguments) and initializes an RpcEnv.

main asserts that:

  1. External shuffle service is not used (based on spark.shuffle.service.enabled configuration property)
  2. Number of worker instances is 1 (based on SPARK_WORKER_INSTANCES environment variable)

main throws an IllegalArgumentException when the above does not hold:

Starting multiple workers on one host is failed because we may launch no more than one external shuffle service on each host, please set spark.shuffle.service.enabled to false or set SPARK_WORKER_INSTANCES to 1 to resolve the conflict.

In the end, main requests the RpcEnv to be notified when terminated.

Command-Line Options

Worker supports command-line options.

Usage: Worker [options] <master>

Master must be a URL of the form spark://hostname:port

Options:
  -c CORES, --cores CORES  Number of cores to use
  -m MEM, --memory MEM     Amount of memory to use (e.g. 1000M, 2G)
  -d DIR, --work-dir DIR   Directory to run apps in (default: SPARK_HOME/work)
  -i HOST, --ip IP         Hostname to listen on (deprecated, please use --host or -h)
  -h HOST, --host HOST     Hostname to listen on
  -p PORT, --port PORT     Port to listen on (default: random)
  --webui-port PORT        Port for web UI (default: 8081)
  --properties-file FILE   Path to a custom Spark properties file.
                           Default is conf/spark-defaults.conf.

cores

host

ip

Master URLs

(required) Comma-separated standalone Master's URLs in the form:

spark://host1:port1,host2:port2,...

memory

port

properties-file

webui-port

work-dir

Creating Instance

Worker takes the following to be created:

Worker is created when:

ExternalShuffleService

Worker initializes an ExternalShuffleService (directly or indirectly using a Supplier if given).

ExternalShuffleService is started when Worker is requested to startExternalShuffleService.

ExternalShuffleService is used as follows:

ExternalShuffleService is stopped when Worker is requested to stop.

Starting Up RPC Environment

startRpcEnvAndEndpoint(
  host: String,
  port: Int,
  webUiPort: Int,
  cores: Int,
  memory: Int,
  masterUrls: Array[String],
  workDir: String,
  workerNumber: Option[Int] = None,
  conf: SparkConf = new SparkConf,
  resourceFileOpt: Option[String] = None): RpcEnv

startRpcEnvAndEndpoint creates an RpcEnv with the name sparkWorker and the given host and port.

startRpcEnvAndEndpoint translates the given masterUrls to RpcAddresses.

startRpcEnvAndEndpoint creates a Worker and requests the RpcEnv to set it up as an RPC endpoint under the Worker name.

startRpcEnvAndEndpoint is used when:

  • LocalSparkCluster is requested to start
  • Worker standalone application is launched

onStart

onStart(): Unit

onStart is part of the RpcEndpoint abstraction.

onStart...FIXME

Creating Work Directory

createWorkDir(): Unit

createWorkDir sets <> to be either <> if defined or <> with work subdirectory.

In the end, createWorkDir creates <> directory (including any necessary but nonexistent parent directories).

createWorkDir reports...FIXME

Messages

ApplicationFinished

DriverStateChanged

ExecutorStateChanged

ExecutorStateChanged(
  appId: String,
  execId: Int,
  state: ExecutorState,
  message: Option[String],
  exitStatus: Option[Int])

Message Handler: handleExecutorStateChanged

Posted when:

KillDriver

KillExecutor

LaunchDriver

LaunchExecutor

MasterChanged

ReconnectWorker

RegisterWorkerResponse

ReregisterWithMaster

RequestWorkerState

SendHeartbeat

WorkDirCleanup

handleExecutorStateChanged

handleExecutorStateChanged(
  executorStateChanged: ExecutorStateChanged): Unit

handleExecutorStateChanged...FIXME

handleExecutorStateChanged is used when:

maybeCleanupApplication

maybeCleanupApplication(
  id: String): Unit

maybeCleanupApplication...FIXME

maybeCleanupApplication is used when:

Logging

Enable ALL logging level for org.apache.spark.deploy.worker.Worker logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.deploy.worker.Worker=ALL

Refer to Logging.


Last update: 2020-12-19