Skip to content


SparkContext is the entry point to all of the components of Apache Spark (execution engine) and so the heart of a Spark application. In fact, you can consider an application a Spark application only when it uses a SparkContext (directly or indirectly).

Spark context acts as the master of your Spark application


There should be one active SparkContext per JVM and Spark developers should use SparkContext.getOrCreate utility for sharing it (e.g. across threads).

Creating Instance

SparkContext takes the following to be created:

SparkContext is created (directly or indirectly using getOrCreate utility).

While being created, SparkContext sets up core services and establishes a connection to a cluster manager.

Checkpoint Directory

SparkContext defines checkpointDir internal registry for the path to a checkpoint directory.

checkpointDir is undefined (None) when SparkContext is created and is set using setCheckpointDir.

checkpointDir is required for Reliable Checkpointing.

checkpointDir is available using getCheckpointDir.


getCheckpointDir: Option[String]

getCheckpointDir returns the checkpointDir.

getCheckpointDir is used when:

Submitting MapStage for Execution

submitMapStage[K, V, C](
  dependency: ShuffleDependency[K, V, C]): SimpleFutureAction[MapOutputStatistics]

submitMapStage requests the DAGScheduler to submit the given ShuffleDependency for execution (that eventually produces a MapOutputStatistics).

submitMapStage is used when:

  • ShuffleExchangeExec (Spark SQL) unary physical operator is executed


SparkContext creates an ExecutorMetricsSource when created with spark.metrics.executorMetricsSource.enabled enabled.

SparkContext requests the ExecutorMetricsSource to register with the MetricsSystem.

SparkContext uses the ExecutorMetricsSource to create the Heartbeater.



SparkContext creates a ResourceProfileManager when created.


resourceProfileManager: ResourceProfileManager

resourceProfileManager returns the ResourceProfileManager.

resourceProfileManager is used when:


SparkContext can create a DriverLogger when created.

SparkContext requests the DriverLogger to startSync in postApplicationStart.


SparkContext can create an AppStatusSource when created (based on the spark.metrics.appStatusSource.enabled configuration property).

SparkContext uses the AppStatusSource to create the AppStatusStore.

If configured, SparkContext registers the AppStatusSource with the MetricsSystem.


SparkContext creates an AppStatusStore when created (with itself and the AppStatusSource).

SparkContext requests AppStatusStore for the AppStatusListener and requests the LiveListenerBus to add it to the application status queue.

SparkContext uses the AppStatusStore to create the following:

AppStatusStore is requested to status/ in stop.


statusStore: AppStatusStore

statusStore returns the AppStatusStore.

statusStore is used when:

  • SparkContext is requested to getRDDStorageInfo
  • ConsoleProgressBar is requested to refresh
  • HiveThriftServer2 is requested to createListenerAndUI
  • SharedState (Spark SQL) is requested for a SQLAppStatusStore and a StreamingQueryStatusListener


SparkContext creates a SparkStatusTracker when created (with itself and the AppStatusStore).


statusTracker: SparkStatusTracker

statusTracker returns the SparkStatusTracker.

Local Properties

localProperties: InheritableThreadLocal[Properties]

SparkContext uses an InheritableThreadLocal (Java) of key-value pairs of thread-local properties to pass extra information from a parent thread (on the driver) to child threads.

localProperties is meant to be used by developers using SparkContext.setLocalProperty and SparkContext.getLocalProperty.

Local Properties are available using TaskContext.getLocalProperty.

Local Properties are available to SparkListeners using the following events:

localProperties are passed down when SparkContext is requested for the following:

DAGScheduler passes down local properties when scheduling:

Spark (Core) defines the following local properties.

Name Default Value Setter
callSite.short SparkContext.setCallSite
spark.job.description callSite.short SparkContext.setJobDescription
spark.job.interruptOnCancel SparkContext.setJobGroup SparkContext.setJobGroup


SparkContext creates a ShuffleDriverComponents when created.

SparkContext loads the ShuffleDataIO that is in turn requested for the ShuffleDriverComponents. SparkContext requests the ShuffleDriverComponents to initialize.

The ShuffleDriverComponents is used when:

SparkContext requests the ShuffleDriverComponents to clean up when stopping.

Static Files


  path: String,
  recursive: Boolean): Unit
// recursive = false
  path: String): Unit

addFile creates a Hadoop Path from the given path. For a no-schema path, addFile converts it to a canonical form.

addFile prints out the following WARN message to the logs and exits.

File with 'local' scheme is not supported to add to file server, since it is already available on every node.


In the end, addFile adds the file to the addedFiles internal registry (with the current timestamp):

  • For new files, addFile prints out the following INFO message to the logs, fetches the file (to the root directory and without using the cache) and postEnvironmentUpdate.

    Added file [path] at [key] with timestamp [timestamp]
  • For files that were already added, addFile prints out the following WARN message to the logs:

    The path [path] has been added already. Overwriting of added paths is not supported in the current version.

addFile is used when:


listFiles(): Seq[String]

listFiles is the files added.

addedFiles Internal Registry

addedFiles: Map[String, Long]

addedFiles is a collection of static files by the timestamp the were added at.

addedFiles is used when:


files: Seq[String]

files is a collection of file paths defined by spark.files configuration property.

Posting SparkListenerEnvironmentUpdate Event

postEnvironmentUpdate(): Unit


postEnvironmentUpdate is used when:

getOrCreate Utility

getOrCreate(): SparkContext
  config: SparkConf): SparkContext



SparkContext creates a PluginContainer when created.

PluginContainer is created (for the driver where SparkContext lives) using PluginContainer.apply utility.

PluginContainer is then requested to registerMetrics with the applicationId.

PluginContainer is requested to shutdown when SparkContext is requested to stop.

Creating SchedulerBackend and TaskScheduler

  sc: SparkContext,
  master: String,
  deployMode: String): (SchedulerBackend, TaskScheduler)

createTaskScheduler creates a SchedulerBackend and a TaskScheduler for the given master URL and deployment mode.

SparkContext creates Task Scheduler and Scheduler Backend

Internally, createTaskScheduler branches off per the given master URL to select the requested implementations.

createTaskScheduler accepts the following master URLs:

  • local - local mode with 1 thread only
  • local[n] or local[*] - local mode with n threads
  • local[n, m] or local[*, m] -- local mode with n threads and m number of failures
  • spark://hostname:port for Spark Standalone
  • local-cluster[n, m, z] -- local cluster with n workers, m cores per worker, and z memory per worker
  • Other URLs are simply handed over to getClusterManager to load an external cluster manager if available

createTaskScheduler is used when SparkContext is created.

Loading ExternalClusterManager

  url: String): Option[ExternalClusterManager]

getClusterManager uses Java's ServiceLoader to find and load an ExternalClusterManager that supports the given master URL.

ExternalClusterManager Service Discovery

For ServiceLoader to find ExternalClusterManagers, they have to be registered using the following file:


getClusterManager throws a SparkException when multiple cluster managers were found:

Multiple external cluster managers registered for the url [url]: [serviceLoaders]

getClusterManager is used when SparkContext is requested for a SchedulerBackend and TaskScheduler.

Running Job Synchronously

runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U): Array[U]
runJob[T, U: ClassTag](
  rdd: RDD[T],
  processPartition: (TaskContext, Iterator[T]) => U,
  resultHandler: (Int, U) => Unit): Unit
runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int]): Array[U]
runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  resultHandler: (Int, U) => Unit): Unit
runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: Iterator[T] => U): Array[U]
runJob[T, U: ClassTag](
  rdd: RDD[T],
  processPartition: Iterator[T] => U,
  resultHandler: (Int, U) => Unit): Unit
runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: Iterator[T] => U,
  partitions: Seq[Int]): Array[U]

Executing action

runJob finds the call site and cleans up the given func function.

runJob prints out the following INFO message to the logs:

Starting job: [callSite]

With spark.logLineage enabled, runJob requests the given RDD for the recursive dependencies and prints out the following INFO message to the logs:

RDD's recursive dependencies:

runJob requests the DAGScheduler to run a job.

runJob requests the ConsoleProgressBar to finishAll if defined.

In the end, runJob requests the given RDD to doCheckpoint.

runJob throws an IllegalStateException when SparkContext is stopped:

SparkContext has been shutdown


runJob is essentially executing a func function on all or a subset of partitions of an RDD and returning the result as an array (with elements being the results per partition).

sc.setLocalProperty("callSite.short", "runJob Demo")

val partitionsNumber = 4
val rdd = sc.parallelize(
  Seq("hello world", "nice to see you"),
  numSlices = partitionsNumber)

import org.apache.spark.TaskContext
val func = (t: TaskContext, ss: Iterator[String]) => 1
val result = sc.runJob(rdd, func)
assert(result.length == partitionsNumber)


Call Site

getCallSite(): CallSite


getCallSite is used when:

Closure Cleaning

  f: F,
  checkSerializable: Boolean = true): F

clean cleans up the given f closure (using ClosureCleaner.clean utility).


Enable DEBUG logging level for org.apache.spark.util.ClosureCleaner logger to see what happens inside the class.

Add the following line to conf/

Refer to Logging.

With DEBUG logging level you should see the following messages in the logs:

+++ Cleaning closure [func] ([func.getClass.getName]) +++
 + declared fields: [declaredFields.size]
+++ closure [func] ([func.getClass.getName]) is now cleaned +++


Enable ALL logging level for org.apache.spark.SparkContext logger to see what happens inside.

Add the following line to conf/

Refer to Logging.

To Be Reviewed

SparkContext offers the following functions:

  • Getting current status of a Spark application ** <> ** <> ** <> ** <> ** <> ** <> ** <> that specifies the number of[partitions] in RDDs when they are created without specifying the number explicitly by a user. ** <> ** <> ** <> ** <> ** <>

  • Setting Configuration ** <> ** Local Properties ** <> ** <>

  • Creating Distributed Entities ** <> ** <> ** <>

  • Accessing services, e.g. <>, <>,[],[BlockManager],[SchedulerBackends],[ShuffleManager] and the <>.

  • <>

  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>

TIP: Read the scaladoc of[org.apache.spark.SparkContext].

Removing RDD Blocks from BlockManagerMaster

  rddId: Int,
  blocking: Boolean = true): Unit

unpersistRDD requests BlockManagerMaster to[remove the blocks for the RDD] (given rddId).

NOTE: unpersistRDD uses SparkEnv[to access the current BlockManager] that is in turn used to[access the current BlockManagerMaster].

unpersistRDD removes rddId from <> registry.

In the end, unpersistRDD posts a[SparkListenerUnpersistRDD] (with rddId) to <>.


unpersistRDD is used when:

  • ContextCleaner does[doCleanupRDD]
  • SparkContext <> (i.e. marks an RDD as non-persistent)

== [[applicationId]] Unique Identifier of Spark Application -- applicationId Method


== [[postApplicationStart]] postApplicationStart Internal Method

[source, scala]

postApplicationStart(): Unit


postApplicationStart is used exclusively when SparkContext is created.

== [[postApplicationEnd]] postApplicationEnd Method


== [[clearActiveContext]] clearActiveContext Method


== [[getPersistentRDDs]] Accessing persistent RDDs -- getPersistentRDDs Method

[source, scala]

getPersistentRDDs: Map[Int, RDD[_]]

getPersistentRDDs returns the collection of RDDs that have marked themselves as persistent via[cache].

Internally, getPersistentRDDs returns <> internal registry.

== [[cancelJob]] Cancelling Job -- cancelJob Method

[source, scala]

cancelJob(jobId: Int)

cancelJob requests DAGScheduler[to cancel a Spark job].

== [[cancelStage]] Cancelling Stage -- cancelStage Methods

[source, scala]

cancelStage(stageId: Int): Unit cancelStage(stageId: Int, reason: String): Unit

cancelStage simply requests DAGScheduler[to cancel a Spark stage] (with an optional reason).

NOTE: cancelStage is used when StagesTab[handles a kill request] (from a user in web UI).

Programmable Dynamic Allocation

SparkContext offers the following methods as the developer API for Dynamic Allocation of Executors:

  • <>
  • <>
  • <>
  • (private!) <>

=== [[requestExecutors]] Requesting New Executors -- requestExecutors Method

[source, scala]

requestExecutors(numAdditionalExecutors: Int): Boolean

requestExecutors requests numAdditionalExecutors executors from[CoarseGrainedSchedulerBackend].

=== [[killExecutors]] Requesting to Kill Executors -- killExecutors Method

[source, scala]

killExecutors(executorIds: Seq[String]): Boolean


=== [[requestTotalExecutors]] Requesting Total Executors -- requestTotalExecutors Method

[source, scala]

requestTotalExecutors( numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int]): Boolean

requestTotalExecutors is a private[spark] method that[requests the exact number of executors from a coarse-grained scheduler backend].

NOTE: It works for[coarse-grained scheduler backends] only.

When called for other scheduler backends you should see the following WARN message in the logs:

Requesting executors is only supported in coarse-grained mode

Executor IDs

getExecutorIds is a private[spark] method that is part of ExecutorAllocationClient contract. It simply passes the call on to the current coarse-grained scheduler backend, i.e. calls getExecutorIds.


It works for coarse-grained scheduler backends only.

When called for other scheduler backends you should see the following WARN message in the logs:

Requesting executors is only supported in coarse-grained mode

CAUTION: FIXME Why does SparkContext implement the method for coarse-grained scheduler backends? Why doesn't SparkContext throw an exception when the method is called? Nobody seems to be using it (!)

=== [[getOrCreate]] Getting Existing or Creating New SparkContext -- getOrCreate Methods

[source, scala]

getOrCreate(): SparkContext getOrCreate(conf: SparkConf): SparkContext

getOrCreate methods allow you to get the existing SparkContext or create a new one.

[source, scala]

import org.apache.spark.SparkContext val sc = SparkContext.getOrCreate()

// Using an explicit SparkConf object import org.apache.spark.SparkConf val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkMe App") val sc = SparkContext.getOrCreate(conf)

The no-param getOrCreate method requires that the two mandatory Spark settings - <> and <> - are specified using[spark-submit].

=== [[constructors]] Constructors

[source, scala]

SparkContext() SparkContext(conf: SparkConf) SparkContext(master: String, appName: String, conf: SparkConf) SparkContext( master: String, appName: String, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map())

You can create a SparkContext instance using the four constructors.

[source, scala]

import org.apache.spark.SparkConf val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkMe App")

import org.apache.spark.SparkContext val sc = new SparkContext(conf)

When a Spark context starts up you should see the following INFO in the logs (amongst the other messages that come from the Spark services):

Running Spark version 2.0.0-SNAPSHOT

NOTE: Only one SparkContext may be running in a single JVM (check out[SPARK-2243 Support multiple SparkContexts in the same JVM]). Sharing access to a SparkContext in the JVM is the solution to share data within Spark (without relying on other means of data sharing using external data stores).

== [[appName]] Application Name -- appName Method

[source, scala]

appName: String

appName gives the value of the mandatory[] setting.

NOTE: appName is used when[SparkDeploySchedulerBackend starts],[SparkUI creates a web UI], when postApplicationStart is executed, and for Mesos and checkpointing in Spark Streaming.

== [[applicationAttemptId]] Unique Identifier of Execution Attempt -- applicationAttemptId Method

[source, scala]

applicationAttemptId: Option[String]

applicationAttemptId gives the unique identifier of the execution attempt of a Spark application.


applicationAttemptId is used when:

  •[ShuffleMapTask] and[ResultTask] are created

* SparkContext <>

== [[getExecutorStorageStatus]] Storage Status (of All BlockManagers) -- getExecutorStorageStatus Method

[source, scala]

getExecutorStorageStatus: Array[StorageStatus]

getExecutorStorageStatus[requests BlockManagerMaster for storage status] (of all[BlockManagers]).

NOTE: getExecutorStorageStatus is a developer API.

getExecutorStorageStatus is used when:

== [[getPoolForName]] Schedulable (Pool) by Name -- getPoolForName Method

[source, scala]

getPoolForName(pool: String): Option[Schedulable]

getPoolForName returns a[Schedulable] by the pool name, if one exists.

NOTE: getPoolForName is part of the Developer's API and may change in the future.

Internally, it requests the[TaskScheduler for the root pool] and[looks up the Schedulable by the pool name].

It is exclusively used to[show pool details in web UI (for a stage)].

== [[getAllPools]] All Schedulable Pools -- getAllPools Method

[source, scala]

getAllPools: Seq[Schedulable]

getAllPools collects the[Pools] in[TaskScheduler.rootPool].

NOTE: TaskScheduler.rootPool is part of the[TaskScheduler Contract].

NOTE: getAllPools is part of the Developer's API.

CAUTION: FIXME Where is the method used?

NOTE: getAllPools is used to calculate pool names for[Stages tab in web UI] with FAIR scheduling mode used.

== [[defaultParallelism]] Default Level of Parallelism

[source, scala]

defaultParallelism: Int

defaultParallelism requests <> for the[default level of parallelism].

NOTE: Default level of parallelism specifies the number of[partitions] in RDDs when created without specifying them explicitly by a user.


defaultParallelism is used in <>, SparkContext.range and <> (as well as Spark Streaming's DStream.countByValue and DStream.countByValueAndWindow et al.).

defaultParallelism is also used to instantiate[HashPartitioner] and for the minimum number of partitions in[HadoopRDDs].

== [[taskScheduler]] Current Spark Scheduler (aka TaskScheduler) -- taskScheduler Property

[source, scala]

taskScheduler: TaskScheduler taskScheduler_=(ts: TaskScheduler): Unit

taskScheduler manages (i.e. reads or writes) <<_taskScheduler, _taskScheduler>> internal property.

== [[version]] Getting Spark Version -- version Property

[source, scala]

version: String

version returns the Spark version this SparkContext uses.

== [[makeRDD]] makeRDD Method


== [[submitJob]] Submitting Jobs Asynchronously -- submitJob Method

[source, scala]

submitJobT, U, R: SimpleFutureAction[R]

submitJob submits a job in an asynchronous, non-blocking way to[DAGScheduler].

It cleans the processPartition input function argument and returns an instance of[SimpleFutureAction] that holds the JobWaiter instance.

CAUTION: FIXME What are resultFunc?

It is used in:

  •[AsyncRDDActions] methods
  • spark-streaming/[Spark Streaming] for spark-streaming/[ReceiverTrackerEndpoint.startReceiver]

== [[spark-configuration]] Spark Configuration


== [[sparkcontext-and-rdd]] SparkContext and RDDs

You use a Spark context to create RDDs (see <>).

When an RDD is created, it belongs to and is completely owned by the Spark context it originated from. RDDs can't by design be shared between SparkContexts.

.A Spark context creates a living space for RDDs. image::diagrams/sparkcontext-rdds.png)

== [[creating-rdds]][[parallelize]] Creating RDD -- parallelize Method

SparkContext allows you to create many different RDDs from input sources like:

  • Scala's collections, i.e. sc.parallelize(0 to 100)
  • local or remote filesystems, i.e. sc.textFile("")
  • Any Hadoop InputSource using sc.newAPIHadoopFile

Read[Creating RDDs] in[RDD - Resilient Distributed Dataset].

== [[unpersist]] Unpersisting RDD (Marking RDD as Non-Persistent) -- unpersist Method


unpersist removes an RDD from the master's[Block Manager] (calls removeRdd(rddId: Int, blocking: Boolean)) and the internal <> mapping.

It finally posts[SparkListenerUnpersistRDD] message to listenerBus.

== [[setCheckpointDir]] Setting Checkpoint Directory -- setCheckpointDir Method

[source, scala]

setCheckpointDir(directory: String)

setCheckpointDir method is used to set up the checkpoint directory...FIXME


== [[register]] Registering Accumulator -- register Methods

[source, scala]

register(acc: AccumulatorV2[, _]): Unit register(acc: AccumulatorV2[, _], name: String): Unit

register registers the acc accumulator. You can optionally give an accumulator a name.

TIP: You can create built-in accumulators for longs, doubles, and collection types using <>.

Internally, register registers acc accumulator (with the current SparkContext).

== [[creating-accumulators]][[longAccumulator]][[doubleAccumulator]][[collectionAccumulator]] Creating Built-In Accumulators

[source, scala]

longAccumulator: LongAccumulator longAccumulator(name: String): LongAccumulator doubleAccumulator: DoubleAccumulator doubleAccumulator(name: String): DoubleAccumulator collectionAccumulator[T]: CollectionAccumulator[T] collectionAccumulatorT: CollectionAccumulator[T]

You can use longAccumulator, doubleAccumulator or collectionAccumulator to create and register accumulators for simple and collection values.

longAccumulator returns LongAccumulator with the zero value 0.

doubleAccumulator returns DoubleAccumulator with the zero value 0.0.

collectionAccumulator returns CollectionAccumulator with the zero value java.util.List[T].

scala> val acc = sc.longAccumulator
acc: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: None, value: 0)

scala> val counter = sc.longAccumulator("counter")
counter: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 1, name: Some(counter), value: 0)

scala> counter.value
res0: Long = 0

scala> sc.parallelize(0 to 9).foreach(n => counter.add(n))

scala> counter.value
res3: Long = 45

The name input parameter allows you to give a name to an accumulator and have it displayed in[Spark UI] (under Stages tab for a given stage).

Accumulators in the Spark UI


You can register custom accumulators using register methods.

== [[broadcast]] Creating Broadcast Variable -- broadcast Method

[source, scala]

broadcastT: Broadcast[T]

broadcast method creates a[]. It is a shared memory with value (as broadcast blocks) on the driver and later on all Spark executors.

val sc: SparkContext = ???
scala> val hello = sc.broadcast("hello")
hello: org.apache.spark.broadcast.Broadcast[String] = Broadcast(0)

Spark transfers the value to Spark executors once, and tasks can share it without incurring repetitive network transmissions when the broadcast variable is used multiple times.

Broadcasting a value to executors

Internally, broadcast requests BroadcastManager for a new broadcast variable.

NOTE: The current BroadcastManager is available using[SparkEnv.broadcastManager] attribute and is always BroadcastManager (with few internal configuration changes to reflect where it runs, i.e. inside the driver or executors).

You should see the following INFO message in the logs:

Created broadcast [id] from [callSite]

If ContextCleaner is defined, the[new broadcast variable is registered for cleanup].


Spark does not support broadcasting RDDs.

scala> sc.broadcast(sc.range(0, 10))
java.lang.IllegalArgumentException: requirement failed: Can not directly broadcast RDDs; instead, call collect() and broadcast the result.
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1392)
  ... 48 elided

Once created, the broadcast variable (and other blocks) are displayed per executor and the driver in web UI.

Broadcast Variables In web UI's Executors Tab

== [[jars]] Distribute JARs to workers

The jar you specify with SparkContext.addJar will be copied to all the worker nodes.

The configuration setting spark.jars is a comma-separated list of jar paths to be included in all tasks executed from this SparkContext. A path can either be a local file, a file in HDFS (or other Hadoop-supported filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.

scala> sc.addJar("build.sbt")
15/11/11 21:54:54 Added JAR build.sbt at with timestamp 1447275294457

CAUTION: FIXME Why is HttpFileServer used for addJar?

=== SparkContext as Application-Wide Counter

SparkContext keeps track of:

[[nextShuffleId]] * shuffle ids using nextShuffleId internal counter for[registering shuffle dependencies] to[Shuffle Service].

== [[stop]][[stopping]] Stopping SparkContext -- stop Method

[source, scala]

stop(): Unit

stop stops the SparkContext.

Internally, stop enables stopped internal flag. If already stopped, you should see the following INFO message in the logs:

SparkContext already stopped.

stop then does the following:

  1. Removes _shutdownHookRef from ShutdownHookManager
  2. <SparkListenerApplicationEnd>> (to <>)
  3.[Stops web UI]
  4. Requests MetricSystem to report metrics (from all registered sinks)
  5.[Stops ContextCleaner]
  6. Requests ExecutorAllocationManager to stop
  7. If LiveListenerBus was started,[requests LiveListenerBus to stop]
  8. Requests[EventLoggingListener to stop]
  9. Requests[DAGScheduler to stop]
  10. Requests[RpcEnv to stop HeartbeatReceiver endpoint]
  11. Requests ConsoleProgressBar to stop
  12. Clears the reference to TaskScheduler, i.e. _taskScheduler is null
  13. Requests[SparkEnv to stop] and clears SparkEnv
  14. Clears yarn/[SPARK_YARN_MODE flag]
  15. <>

Ultimately, you should see the following INFO message in the logs:

Successfully stopped SparkContext

Registering SparkListener

  listener: SparkListenerInterface): Unit

addSparkListener registers a custom SparkListenerInterface.


Custom listeners can also be registered declaratively using spark.extraListeners configuration property.

== [[custom-schedulers]] Custom SchedulerBackend, TaskScheduler and DAGScheduler

By default, SparkContext uses (private[spark] class) org.apache.spark.scheduler.DAGScheduler, but you can develop your own custom DAGScheduler implementation, and use (private[spark]) SparkContext.dagScheduler_=(ds: DAGScheduler) method to assign yours.

It is also applicable to SchedulerBackend and TaskScheduler using schedulerBackend_=(sb: SchedulerBackend) and taskScheduler_=(ts: TaskScheduler) methods, respectively.

CAUTION: FIXME Make it an advanced exercise.

== [[events]] Events

When a Spark context starts, it triggers[SparkListenerEnvironmentUpdate] and[SparkListenerApplicationStart] messages.

Refer to the section <>.

== [[setLogLevel]][[setting-default-log-level]] Setting Default Logging Level -- setLogLevel Method

[source, scala]

setLogLevel(logLevel: String)

setLogLevel allows you to set the root logging level in a Spark application, e.g.[Spark shell].

Internally, setLogLevel calls ++[org.apache.log4j.Level.toLevel(logLevel)] that it then uses to set using ++[org.apache.log4j.LogManager.getRootLogger().setLevel(level)].


You can directly set the logging level using ++[org.apache.log4j.LogManager.getLogger()].

[source, scala]



== [[hadoopConfiguration]] Hadoop Configuration

While a <>, so is a Hadoop configuration (as an instance of[org.apache.hadoop.conf.Configuration] that is available as _hadoopConfiguration).

NOTE:[SparkHadoopUtil.get.newConfiguration] is used.

If a SparkConf is provided it is used to build the configuration as described. Otherwise, the default Configuration object is returned.

If AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are both available, the following settings are set for the Hadoop configuration:

  • fs.s3.awsAccessKeyId, fs.s3n.awsAccessKeyId, fs.s3a.access.key are set to the value of AWS_ACCESS_KEY_ID
  • fs.s3.awsSecretAccessKey, fs.s3n.awsSecretAccessKey, and fs.s3a.secret.key are set to the value of AWS_SECRET_ACCESS_KEY

Every spark.hadoop. setting becomes a setting of the configuration with the prefix spark.hadoop. removed for the key.

The value of spark.buffer.size (default: 65536) is used as the value of io.file.buffer.size.

== [[listenerBus]] listenerBus -- LiveListenerBus Event Bus

listenerBus is a[] object that acts as a mechanism to announce events to other services on the[driver].

LiveListenerBus is created and started when SparkContext is created and, since it is a single-JVM event bus, is exclusively used on the driver.

== [[startTime]] Time when SparkContext was Created -- startTime Property

[source, scala]

startTime: Long

startTime is the time in milliseconds when <>.

[source, scala]

scala> sc.startTime res0: Long = 1464425605653

== [[cancelJobGroup]] Cancelling Job Group -- cancelJobGroup Method

[source, scala]

cancelJobGroup(groupId: String)

cancelJobGroup requests DAGScheduler[to cancel a group of active Spark jobs].

NOTE: cancelJobGroup is used exclusively when SparkExecuteStatementOperation does cancel.

== [[cancelAllJobs]] Cancelling All Running and Scheduled Jobs -- cancelAllJobs Method


NOTE: cancelAllJobs is used when[spark-shell] is terminated (e.g. using Ctrl+C, so it can in turn terminate all active Spark jobs) or SparkSQLCLIDriver is terminated.

== [[cleaner]] ContextCleaner

[source, scala]

cleaner: Option[ContextCleaner]

SparkContext may have a[ContextCleaner] defined.

ContextCleaner is created when SparkContext is created with[spark.cleaner.referenceTracking] configuration property enabled.

== [[getPreferredLocs]] Finding Preferred Locations (Placement Preferences) for RDD Partition

[source, scala]

getPreferredLocs( rdd: RDD[_], partition: Int): Seq[TaskLocation]

getPreferredLocs simply[requests DAGScheduler for the preferred locations for partition].

NOTE: Preferred locations of a partition of a RDD are also called placement preferences or locality preferences.

getPreferredLocs is used in CoalescedRDDPartition, DefaultPartitionCoalescer and PartitionerAwareUnionRDD.

== [[persistRDD]] Registering RDD in persistentRdds Internal Registry -- persistRDD Internal Method

[source, scala]

persistRDD(rdd: RDD[_]): Unit

persistRDD registers rdd in <> internal registry.

NOTE: persistRDD is used exclusively when RDD is[persisted or locally checkpointed].

== [[getRDDStorageInfo]] Getting Storage Status of Cached RDDs (as RDDInfos) -- getRDDStorageInfo Methods

[source, scala]

getRDDStorageInfo: Array[RDDInfo] // <1> getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] // <2>

<1> Part of Spark's Developer API that uses <2> filtering no RDDs

getRDDStorageInfo takes all the RDDs (from <> registry) that match filter and creates a collection of[RDDInfo] instances.


In the end, getRDDStorageInfo gives only the RDD that are cached (i.e. the sum of memory and disk sizes as well as the number of partitions cached are greater than 0).

getRDDStorageInfo is used when"

== [[statusStore]] Accessing AppStatusStore

[source, scala]

statusStore: AppStatusStore

statusStore gives the current[].

statusStore is used when:

  • SparkContext is requested to <>

  • ConsoleProgressBar is requested to refresh

  • SharedState (Spark SQL) is requested for a SQLAppStatusStore

== [[uiWebUrl]] Requesting URL of web UI -- uiWebUrl Method

[source, scala]

uiWebUrl: Option[String]

uiWebUrl requests the SparkUI for webUrl.

== [[maxNumConcurrentTasks]] maxNumConcurrentTasks Method

[source, scala]

maxNumConcurrentTasks(): Int

maxNumConcurrentTasks simply requests the <> for the[maximum number of tasks that can be launched concurrently].

NOTE: maxNumConcurrentTasks is used exclusively when DAGScheduler is requested to[checkBarrierStageWithNumSlots].

== [[environment-variables]] Environment Variables

.Environment Variables [cols="1,1,2",options="header",width="100%"] |=== | Environment Variable | Default Value | Description

| [[SPARK_EXECUTOR_MEMORY]] SPARK_EXECUTOR_MEMORY | 1024 | Amount of memory to allocate for a Spark executor in MB.

See[Executor Memory].

The user who is running SparkContext. Available later as <>.

== [[addJar-internals]] addJar Method

[source, scala]

addJar(path: String): Unit


NOTE: addJar is used when...FIXME

== [[runApproximateJob]] Running Approximate Job

[source, scala]

runApproximateJobT, U, R: PartialResult[R]


runApproximateJob is used when:

  • DoubleRDDFunctions is requested to meanApprox and sumApprox

  • RDD is requested to countApprox and countByValueApprox

== [[killTaskAttempt]] Killing Task

[source, scala]

killTaskAttempt( taskId: Long, interruptThread: Boolean = true, reason: String = "killed via SparkContext.killTaskAttempt"): Boolean

killTaskAttempt requests the <> to[kill a task].

== [[checkpointFile]] checkpointFile Internal Method

[source, scala]

checkpointFileT: ClassTag: RDD[T]


== [[logging]] Logging

Enable ALL logging level for org.apache.spark.SparkContext logger to see what happens inside.

Add the following line to conf/


Refer to[Logging].

== [[internal-properties]] Internal Properties

=== [[persistentRdds]] persistentRdds Lookup Table

Lookup table of persistent/cached RDDs per their ids.

Used when SparkContext is requested to:

  • <>
  • <>
  • <>
  • <>

Creating SparkEnv for Driver

  conf: SparkConf,
  isLocal: Boolean,
  listenerBus: LiveListenerBus): SparkEnv

createSparkEnv uses the SparkEnv utility to create a SparkEnv for the driver (with the arguments and numDriverCores).


  master: String,
  conf: SparkConf): Int