Spark Clustered

Spark can be run in distributed mode on a cluster. The following (open source) cluster managers (aka task schedulers aka resource managers) are currently supported:

Here is a very brief list of pros and cons of using one cluster manager versus the other options supported by Spark:

  1. Spark Standalone is included in the official distribution of Apache Spark.

  2. Hadoop YARN has a very good support for HDFS with data locality.

  3. Apache Mesos makes resource offers that a framework can accept or reject. It is Spark (as a Mesos framework) to decide what resources to accept. It is a push-based resource management model.

  4. Hadoop YARN responds to a YARN framework’s resource requests. Spark (as a YARN framework) requests CPU and memory from YARN. It is a pull-based resource management model.

  5. Hadoop YARN supports Kerberos for a secured HDFS.

Running Spark on a cluster requires workload and resource management on distributed systems.

Spark driver requests resources from a cluster manager. Currently only CPU and memory are requested resources. It is a cluster manager’s responsibility to spawn Spark executors in the cluster (on its workers).

FIXME

  • Spark execution in cluster - Diagram of the communication between driver, cluster manager, workers with executors and tasks. See Cluster Mode Overview.

    • Show Spark’s driver with the main code in Scala in the box

    • Nodes with executors with tasks

  • Hosts drivers

  • Manages a cluster

The workers are in charge of communicating the cluster manager the availability of their resources.

Communication with a driver is through a RPC interface (at the moment Akka), except Mesos in fine-grained mode.

Executors remain alive after jobs are finished for future ones. This allows for better data utilization as intermediate data is cached in memory.

Spark reuses resources in a cluster for:

  • efficient data sharing

  • fine-grained partitioning

  • low-latency scheduling

Reusing also means the the resources can be hold onto for a long time.

Spark reuses long-running executors for speed (contrary to Hadoop MapReduce using short-lived containers for each task).

Spark Application Submission to Cluster

When you submit a Spark application to the cluster this is what happens (see the answers to the answer to What are workers, executors, cores in Spark Standalone cluster? on StackOverflow):

  • The Spark driver is launched to invoke the main method of the Spark application.

  • The driver asks the cluster manager for resources to run the application, i.e. to launch executors that run tasks.

  • The cluster manager launches executors.

  • The driver runs the Spark application and sends tasks to the executors.

  • Executors run the tasks and save the results.

  • Right after SparkContext.stop() is executed from the driver or the main method has exited all the executors are terminated and the cluster resources are released by the cluster manager.

"There’s not a good reason to run more than one worker per machine." by Sean Owen in What is the relationship between workers, worker instances, and executors?
One executor per node may not always be ideal, esp. when your nodes have lots of RAM. On the other hand, using fewer executors has benefits like more efficient broadcasts.

Two modes of launching executors

Review core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Others

Spark application can be split into the part written in Scala, Java, and Python with the cluster itself in which the application is going to run.

Spark application runs on a cluster with the help of cluster manager.

A Spark application consists of a single driver process and a set of executor processes scattered across nodes on the cluster.

Both the driver and the executors usually run as long as the application. The concept of dynamic resource allocation has changed it.

FIXME Figure

A node is a machine, and there’s not a good reason to run more than one worker per machine. So two worker nodes typically means two machines, each a Spark worker.

Workers hold many executors for many applications. One application has executors on many workers.