SparkEnv

SparkEnv is the Spark Execution Environment with the core services of Apache Spark (that interact with each other to establish a distributed computing platform for a Spark application).

SparkEnv are two separate execution environments for the driver and executors.

Accessing Core Services

Property Service

blockManager

BlockManager

broadcastManager

BroadcastManager

closureSerializer

Serializer

conf

SparkConf

mapOutputTracker

MapOutputTracker

memoryManager

MemoryManager

metricsSystem

MetricsSystem

outputCommitCoordinator

OutputCommitCoordinator

rpcEnv

RpcEnv

securityManager

SecurityManager

serializer

Serializer

serializerManager

SerializerManager

shuffleManager

ShuffleManager

Creating Instance

SparkEnv takes the following to be created:

SparkEnv is created when:

Accessing SparkEnv

get: SparkEnv

get returns the SparkEnv on the driver and executors.

import org.apache.spark.SparkEnv
assert(SparkEnv.get.isInstanceOf[SparkEnv])

Creating "Base" SparkEnv

create(
  conf: SparkConf,
  executorId: String,
  bindAddress: String,
  advertiseAddress: String,
  port: Option[Int],
  isLocal: Boolean,
  numUsableCores: Int,
  ioEncryptionKey: Option[Array[Byte]],
  listenerBus: LiveListenerBus = null,
  mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv

create is an utility to create the "base" SparkEnv (that is "enhanced" for the driver and executors later on).

Table 1. create’s Input Arguments and Their Usage
Input Argument Usage

bindAddress

Used to create RpcEnv and NettyBlockTransferService.

advertiseAddress

Used to create RpcEnv and NettyBlockTransferService.

numUsableCores

Used to create MemoryManager, NettyBlockTransferService and BlockManager.

create creates a Serializer (based on spark.serializer setting). You should see the following DEBUG message in the logs:

DEBUG SparkEnv: Using serializer: [serializer]

create creates a closure Serializer (based on spark.closure.serializer).

create creates a ShuffleManager given the value of spark.shuffle.manager configuration property.

create creates a MemoryManager based on spark.memory.useLegacyMode setting (with UnifiedMemoryManager being the default and numCores the input numUsableCores).

create creates a NettyBlockTransferService with the following ports:

create uses the NettyBlockTransferService to create a BlockManager.
FIXME A picture with SparkEnv, NettyBlockTransferService and the ports "armed".

create creates a BlockManagerMaster object with the BlockManagerMaster RPC endpoint reference (by registering or looking it up by name and BlockManagerMasterEndpoint), the input SparkConf, and the input isDriver flag.

sparkenv driver blockmanager
Figure 1. Creating BlockManager for the Driver
create registers the BlockManagerMaster RPC endpoint for the driver and looks it up for executors.
sparkenv executor blockmanager
Figure 2. Creating BlockManager for Executor

create creates a BlockManager (using the above BlockManagerMaster, NettyBlockTransferService and other services).

create creates a BroadcastManager.

create creates a MapOutputTrackerMaster or MapOutputTrackerWorker for the driver and executors, respectively.

The choice of the real implementation of MapOutputTracker is based on whether the input executorId is driver or not.

create registers or looks up RpcEndpoint as MapOutputTracker. It registers MapOutputTrackerMasterEndpoint on the driver and creates a RPC endpoint reference on executors. The RPC endpoint reference gets assigned as the MapOutputTracker RPC endpoint.

FIXME

It creates a CacheManager.

It creates a MetricsSystem for a driver and a worker separately.

It initializes userFiles temporary directory used for downloading dependencies for a driver while this is the executor’s current working directory for an executor.

An OutputCommitCoordinator is created.

create is used when SparkEnv is requested for the SparkEnv for the driver and executors.

Registering or Looking up RPC Endpoint by Name

registerOrLookupEndpoint(
  name: String,
  endpointCreator: => RpcEndpoint)

registerOrLookupEndpoint registers or looks up a RPC endpoint by name.

If called from the driver, you should see the following INFO message in the logs:

Registering [name]

And the RPC endpoint is registered in the RPC environment.

Otherwise, it obtains a RPC endpoint reference by name.

Creating SparkEnv for Driver

createDriverEnv(
  conf: SparkConf,
  isLocal: Boolean,
  listenerBus: LiveListenerBus,
  numCores: Int,
  mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv

createDriverEnv creates a SparkEnv execution environment for the driver.

sparkenv driver
Figure 3. Spark Environment for driver

createDriverEnv accepts an instance of SparkConf, whether it runs in local mode or not, LiveListenerBus, the number of cores to use for execution in local mode or 0 otherwise, and a OutputCommitCoordinator (default: none).

createDriverEnv ensures that spark.driver.host and spark.driver.port settings are defined.

It then passes the call straight on to the create helper method (with driver executor id, isDriver enabled, and the input parameters).

createDriverEnv is exclusively used by SparkContext to create a SparkEnv (while a SparkContext is being created for the driver).

Creating SparkEnv for Executor

createExecutorEnv(
  conf: SparkConf,
  executorId: String,
  hostname: String,
  numCores: Int,
  ioEncryptionKey: Option[Array[Byte]],
  isLocal: Boolean): SparkEnv

createExecutorEnv creates an executor’s (execution) environment that is the Spark execution environment for an executor.

sparkenv executor
Figure 4. Spark Environment for executor
createExecutorEnv is a private[spark] method.

createExecutorEnv simply creates the base SparkEnv (passing in all the input parameters) and sets it as the current SparkEnv.

The number of cores numCores is configured using --cores command-line option of CoarseGrainedExecutorBackend and is specific to a cluster manager.

Stopping SparkEnv

stop(): Unit

stop checks isStopped internal flag and does nothing when enabled already.

Otherwise, stop turns isStopped flag on, stops all pythonWorkers and requests the following services to stop:

Only on the driver, stop deletes the temporary directory. You can see the following WARN message in the logs if the deletion fails.

Exception while deleting Spark temp dir: [path]
stop is used when SparkContext stops (on the driver) and Executor stops.

set Method

set(e: SparkEnv): Unit

set saves the input SparkEnv to env internal registry (as the default SparkEnv).

set is used when…​FIXME

environmentDetails Utility

environmentDetails(
  conf: SparkConf,
  schedulingMode: String,
  addedJars: Seq[String],
  addedFiles: Seq[String]): Map[String, Seq[(String, String)]]

environmentDetails…​FIXME

environmentDetails is used when SparkContext is requested to post a SparkListenerEnvironmentUpdate event.

Logging

Enable ALL logging level for org.apache.spark.SparkEnv logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.SparkEnv=ALL

Refer to Logging.

Internal Properties

Name Description

isStopped

Used to mark SparkEnv stopped

Default: false

driverTmpDir