Skip to content


SparkEnv is a handle to Spark Execution Environment with the core services of Apache Spark (that interact with each other to establish a distributed computing platform for a Spark application).

There are two separate SparkEnvs of the driver and executors.

Core Services

Property Service
blockManager BlockManager
broadcastManager BroadcastManager
closureSerializer Serializer
conf SparkConf
mapOutputTracker MapOutputTracker
memoryManager MemoryManager
metricsSystem MetricsSystem
outputCommitCoordinator OutputCommitCoordinator
rpcEnv RpcEnv
securityManager SecurityManager
serializer Serializer
serializerManager SerializerManager
shuffleManager ShuffleManager

Creating Instance

SparkEnv takes the following to be created:

SparkEnv is created using create utility.

Temporary Directory of Driver

driverTmpDir: Option[String]

SparkEnv defines driverTmpDir internal registry for the driver to be used as the root directory of files added using SparkContext.addFile.

driverTmpDir is undefined initially and is defined for the driver only when SparkEnv utility is used to create a "base" SparkEnv.

Creating SparkEnv for Driver

  conf: SparkConf,
  isLocal: Boolean,
  listenerBus: LiveListenerBus,
  numCores: Int,
  mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv

createDriverEnv creates a SparkEnv execution environment for the driver.

Spark Environment for driver

createDriverEnv accepts an instance of[SparkConf],[whether it runs in local mode or not],[], the number of cores to use for execution in local mode or 0 otherwise, and a[OutputCommitCoordinator] (default: none).

createDriverEnv ensures that[] and[spark.driver.port] settings are defined.

It then passes the call straight on to the <> (with driver executor id, isDriver enabled, and the input parameters).

createDriverEnv is used when SparkContext is created.

Creating SparkEnv for Executor

  conf: SparkConf,
  executorId: String,
  hostname: String,
  numCores: Int,
  ioEncryptionKey: Option[Array[Byte]],
  isLocal: Boolean): SparkEnv
  conf: SparkConf,
  executorId: String,
  bindAddress: String,
  hostname: String,
  numCores: Int,
  ioEncryptionKey: Option[Array[Byte]],
  isLocal: Boolean): SparkEnv

createExecutorEnv creates an executor's (execution) environment that is the Spark execution environment for an executor.

Spark Environment for executor

createExecutorEnv simply <> (passing in all the input parameters) and <>.

NOTE: The number of cores numCores is configured using --cores command-line option of CoarseGrainedExecutorBackend and is specific to a cluster manager.

createExecutorEnv is used when CoarseGrainedExecutorBackend utility is requested to run.

Creating "Base" SparkEnv (for Driver and Executors)

  conf: SparkConf,
  executorId: String,
  bindAddress: String,
  advertiseAddress: String,
  port: Option[Int],
  isLocal: Boolean,
  numUsableCores: Int,
  ioEncryptionKey: Option[Array[Byte]],
  listenerBus: LiveListenerBus = null,
  mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv

create is an utility to create the "base" SparkEnv (that is "enhanced" for the driver and executors later on).

.create's Input Arguments and Their Usage [cols="1,2",options="header",width="100%"] |=== | Input Argument | Usage

| bindAddress | Used to create[RpcEnv] and[NettyBlockTransferService].

| advertiseAddress | Used to create[RpcEnv] and[NettyBlockTransferService].

| numUsableCores | Used to create[MemoryManager],[NettyBlockTransferService] and[BlockManager]. |===

[[create-Serializer]] create creates a Serializer (based on <> setting). You should see the following DEBUG message in the logs:

Using serializer: [serializer]

[[create-closure-Serializer]] create creates a closure Serializer (based on <>).

[[ShuffleManager]][[create-ShuffleManager]] create creates a[ShuffleManager] given the value of[spark.shuffle.manager] configuration property.

[[MemoryManager]][[create-MemoryManager]] create creates a[MemoryManager] based on[spark.memory.useLegacyMode] setting (with[UnifiedMemoryManager] being the default and numCores the input numUsableCores).

[[NettyBlockTransferService]][[create-NettyBlockTransferService]] create creates a[NettyBlockTransferService] with the following ports:

  •[spark.driver.blockManager.port] for the driver (default: 0)

  •[spark.blockManager.port] for an executor (default: 0)

NOTE: create uses the NettyBlockTransferService to <>.

CAUTION: FIXME A picture with SparkEnv, NettyBlockTransferService and the ports "armed".

[[BlockManagerMaster]][[create-BlockManagerMaster]] create creates a[BlockManagerMaster] object with the BlockManagerMaster RPC endpoint reference (by <> and[]), the input[SparkConf], and the input isDriver flag.

.Creating BlockManager for the Driver image::sparkenv-driver-blockmanager.png[align="center"]

NOTE: create registers the BlockManagerMaster RPC endpoint for the driver and looks it up for executors.

.Creating BlockManager for Executor image::sparkenv-executor-blockmanager.png[align="center"]

[[BlockManager]][[create-BlockManager]] create creates a[BlockManager] (using the above <>, <> and other services).

create creates a[].

[[MapOutputTracker]][[create-MapOutputTracker]] create creates a[MapOutputTrackerMaster] or[MapOutputTrackerWorker] for the driver and executors, respectively.

NOTE: The choice of the real implementation of[MapOutputTracker] is based on whether the input executorId is driver or not.

[[MapOutputTrackerMasterEndpoint]][[create-MapOutputTrackerMasterEndpoint]] create <RpcEndpoint>> as MapOutputTracker. It registers[MapOutputTrackerMasterEndpoint] on the driver and creates a RPC endpoint reference on executors. The RPC endpoint reference gets assigned as the[MapOutputTracker RPC endpoint].


[[create-CacheManager]] It creates a CacheManager.

[[create-MetricsSystem]] It creates a MetricsSystem for a driver and a worker separately.

It initializes userFiles temporary directory used for downloading dependencies for a driver while this is the executor's current working directory for an executor.

[[create-OutputCommitCoordinator]] An OutputCommitCoordinator is created.


create is used when SparkEnv utility is used to create a SparkEnv for the driver and executors.

== [[get]] Accessing SparkEnv

[source, scala]

get: SparkEnv

get returns the SparkEnv on the driver and executors.

[source, scala]

import org.apache.spark.SparkEnv assert(SparkEnv.get.isInstanceOf[SparkEnv])

== [[registerOrLookupEndpoint]] Registering or Looking up RPC Endpoint by Name

[source, scala]

registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint)

registerOrLookupEndpoint registers or looks up a RPC endpoint by name.

If called from the driver, you should see the following INFO message in the logs:

Registering [name]

And the RPC endpoint is registered in the RPC environment.

Otherwise, it obtains a RPC endpoint reference by name.

== [[stop]] Stopping SparkEnv

[source, scala]

stop(): Unit

stop checks <> internal flag and does nothing when enabled already.

Otherwise, stop turns isStopped flag on, stops all pythonWorkers and requests the following services to stop:

  6. MetricsSystem

stop[requests RpcEnv to shut down] and[waits till it terminates].

Only on the driver, stop deletes the <>. You can see the following WARN message in the logs if the deletion fails.

Exception while deleting Spark temp dir: [path]

NOTE: stop is used when[SparkContext stops] (on the driver) and[Executor stops].

== [[set]] set Method

[source, scala]

set(e: SparkEnv): Unit

set saves the input SparkEnv to <> internal registry (as the default SparkEnv).

NOTE: set is used when...FIXME

== [[environmentDetails]] environmentDetails Utility

[source, scala]

environmentDetails( conf: SparkConf, schedulingMode: String, addedJars: Seq[String], addedFiles: Seq[String]): Map[String, Seq[(String, String)]]


environmentDetails is used when SparkContext is requested to[post a SparkListenerEnvironmentUpdate event].

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.SparkEnv logger to see what happens inside.

Add the following line to conf/


Refer to[Logging].

== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| isStopped | [[isStopped]] Used to mark SparkEnv stopped

Default: false


Last update: 2021-01-18