SparkContext — Entry Point to Spark Core

SparkContext (aka Spark context) is the entry point to the services of Apache Spark (execution engine) and so the heart of a Spark application. In fact, you can consider an application a Spark application only when it uses a SparkContext (directly or indirectly).

Table 1. SparkContext’s Developer API (Public Methods)
Method Description


addJar(path: String): Unit

More to be added soon

Spark context sets up internal services and establishes a connection to a Spark execution environment.

Once a SparkContext is created you can use it to create RDDs, accumulators and broadcast variables, access Spark services and run jobs (until SparkContext is stopped).

A Spark context is essentially a client of Spark’s execution environment and acts as the master of your Spark application (don’t get confused with the other meaning of Master in Spark, though).

sparkcontext services
Figure 1. Spark context acts as the master of your Spark application

SparkContext offers the following functions:

Table 2. SparkContext’s Internal Registries and Counters
Name Description


Lookup table of persistent/cached RDDs per their ids.

Used when SparkContext is requested to:


Flag that says whether…​FIXME (true) or not (false)

Table 3. SparkContext’s Internal Properties
Name Initial Value Description




Read the scaladoc of org.apache.spark.SparkContext.

Enable INFO logging level for org.apache.spark.SparkContext logger to see what happens inside.

Add the following line to conf/

Refer to Logging.

addFile Method

addFile(path: String): Unit (1)
addFile(path: String, recursive: Boolean): Unit
1 recursive flag is off

addFile adds the path file to be downloaded…​FIXME

addFile is used when:

  • SparkContext is initialized (and files were defined)

  • Spark SQL’s AddFileCommand is executed

  • Spark SQL’s SessionResourceLoader is requested to load a file resource

Removing RDD Blocks from BlockManagerMaster — unpersistRDD Internal Method

unpersistRDD(rddId: Int, blocking: Boolean = true): Unit

unpersistRDD requests BlockManagerMaster to remove the blocks for the RDD (given rddId).

unpersistRDD uses SparkEnv to access the current BlockManager that is in turn used to access the current BlockManagerMaster.

unpersistRDD removes rddId from persistentRdds registry.

In the end, unpersistRDD posts a SparkListenerUnpersistRDD (with rddId) to LiveListenerBus Event Bus.

unpersistRDD is used when:

Unique Identifier of Spark Application — applicationId Method


postApplicationStart Internal Method

postApplicationStart(): Unit


postApplicationStart is used exclusively while SparkContext is being created

postApplicationEnd Method


clearActiveContext Method


Accessing persistent RDDs — getPersistentRDDs Method

getPersistentRDDs: Map[Int, RDD[_]]

getPersistentRDDs returns the collection of RDDs that have marked themselves as persistent via cache.

Internally, getPersistentRDDs returns persistentRdds internal registry.

Cancelling Job — cancelJob Method

cancelJob(jobId: Int)

cancelJob requests DAGScheduler to cancel a Spark job.

Cancelling Stage — cancelStage Methods

cancelStage(stageId: Int): Unit
cancelStage(stageId: Int, reason: String): Unit

cancelStage simply requests DAGScheduler to cancel a Spark stage (with an optional reason).

cancelStage is used when StagesTab handles a kill request (from a user in web UI).

Programmable Dynamic Allocation

SparkContext offers the following methods as the developer API for dynamic allocation of executors:

Requesting New Executors — requestExecutors Method

requestExecutors(numAdditionalExecutors: Int): Boolean

requestExecutors requests numAdditionalExecutors executors from CoarseGrainedSchedulerBackend.

Requesting to Kill Executors — killExecutors Method

killExecutors(executorIds: Seq[String]): Boolean

Requesting Total Executors — requestTotalExecutors Method

  numExecutors: Int,
  localityAwareTasks: Int,
  hostToLocalTaskCount: Map[String, Int]): Boolean

requestTotalExecutors is a private[spark] method that requests the exact number of executors from a coarse-grained scheduler backend.

When called for other scheduler backends you should see the following WARN message in the logs:

WARN Requesting executors is only supported in coarse-grained mode

Getting Executor Ids — getExecutorIds Method

When called for other scheduler backends you should see the following WARN message in the logs:

WARN Requesting executors is only supported in coarse-grained mode
FIXME Why does SparkContext implement the method for coarse-grained scheduler backends? Why doesn’t SparkContext throw an exception when the method is called? Nobody seems to be using it (!)

Creating SparkContext Instance

You can create a SparkContext instance with or without creating a SparkConf object first.

You may want to read Inside Creating SparkContext to learn what happens behind the scenes when SparkContext is created.

Getting Existing or Creating New SparkContext — getOrCreate Methods

getOrCreate(): SparkContext
getOrCreate(conf: SparkConf): SparkContext

getOrCreate methods allow you to get the existing SparkContext or create a new one.

import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate()

// Using an explicit SparkConf object
import org.apache.spark.SparkConf
val conf = new SparkConf()
  .setAppName("SparkMe App")
val sc = SparkContext.getOrCreate(conf)

The no-param getOrCreate method requires that the two mandatory Spark settings - master and application name - are specified using spark-submit.


SparkContext(conf: SparkConf)
SparkContext(master: String, appName: String, conf: SparkConf)
  master: String,
  appName: String,
  sparkHome: String = null,
  jars: Seq[String] = Nil,
  environment: Map[String, String] = Map())

You can create a SparkContext instance using the four constructors.

import org.apache.spark.SparkConf
val conf = new SparkConf()
  .setAppName("SparkMe App")

import org.apache.spark.SparkContext
val sc = new SparkContext(conf)

When a Spark context starts up you should see the following INFO in the logs (amongst the other messages that come from the Spark services):

INFO SparkContext: Running Spark version 2.0.0-SNAPSHOT
Only one SparkContext may be running in a single JVM (check out SPARK-2243 Support multiple SparkContexts in the same JVM). Sharing access to a SparkContext in the JVM is the solution to share data within Spark (without relying on other means of data sharing using external data stores).

Accessing Current SparkEnv — env Method


Getting Current SparkConf — getConf Method

getConf: SparkConf

getConf returns the current SparkConf.

Changing the SparkConf object does not change the current configuration (as the method returns a copy).

Deployment Environment — master Method

master: String

master method returns the current value of spark.master which is the deployment environment in use.

Application Name — appName Method

appName: String

appName gives the value of the mandatory setting.

appName is used when SparkDeploySchedulerBackend starts, SparkUI creates a web UI, when postApplicationStart is executed, and for Mesos and checkpointing in Spark Streaming.

Unique Identifier of Execution Attempt — applicationAttemptId Method

applicationAttemptId: Option[String]

applicationAttemptId gives the unique identifier of the execution attempt of a Spark application.

applicationAttemptId is used when:

Storage Status (of All BlockManagers) — getExecutorStorageStatus Method

getExecutorStorageStatus: Array[StorageStatus]
getExecutorStorageStatus is a developer API.

getExecutorStorageStatus is used when:

Deploy Mode — deployMode Method

deployMode: String

deployMode returns the current value of spark.submit.deployMode setting or client if not set.

Scheduling Mode — getSchedulingMode Method

getSchedulingMode: SchedulingMode.SchedulingMode

getSchedulingMode returns the current Scheduling Mode.

Schedulable (Pool) by Name — getPoolForName Method

getPoolForName(pool: String): Option[Schedulable]

getPoolForName returns a Schedulable by the pool name, if one exists.

getPoolForName is part of the Developer’s API and may change in the future.

It is exclusively used to show pool details in web UI (for a stage).

All Schedulable Pools — getAllPools Method

getAllPools: Seq[Schedulable]

getAllPools collects the Pools in TaskScheduler.rootPool.

TaskScheduler.rootPool is part of the TaskScheduler Contract.
getAllPools is part of the Developer’s API.
FIXME Where is the method used?
getAllPools is used to calculate pool names for Stages tab in web UI with FAIR scheduling mode used.

Default Level of Parallelism

defaultParallelism: Int

defaultParallelism requests TaskScheduler for the default level of parallelism.

Default level of parallelism specifies the number of partitions in RDDs when created without specifying them explicitly by a user.

defaultParallelism is used in SparkContext.parallelize, SparkContext.range and SparkContext.makeRDD (as well as Spark Streaming’s DStream.countByValue and DStream.countByValueAndWindow et al.).

defaultParallelism is also used to instantiate HashPartitioner and for the minimum number of partitions in HadoopRDDs.

Current Spark Scheduler (aka TaskScheduler) — taskScheduler Property

taskScheduler: TaskScheduler
taskScheduler_=(ts: TaskScheduler): Unit

taskScheduler manages (i.e. reads or writes) _taskScheduler internal property.

Getting Spark Version — version Property

version: String

version returns the Spark version this SparkContext uses.

makeRDD Method


Submitting Jobs Asynchronously — submitJob Method

submitJob[T, U, R](
  rdd: RDD[T],
  processPartition: Iterator[T] => U,
  partitions: Seq[Int],
  resultHandler: (Int, U) => Unit,
  resultFunc: => R): SimpleFutureAction[R]

submitJob submits a job in an asynchronous, non-blocking way to DAGScheduler.

It cleans the processPartition input function argument and returns an instance of SimpleFutureAction that holds the JobWaiter instance.

FIXME What are resultFunc?

It is used in:

Spark Configuration


SparkContext and RDDs

You use a Spark context to create RDDs (see Creating RDD).

When an RDD is created, it belongs to and is completely owned by the Spark context it originated from. RDDs can’t by design be shared between SparkContexts.

sparkcontext rdds
Figure 2. A Spark context creates a living space for RDDs.

Creating RDD — parallelize Method

SparkContext allows you to create many different RDDs from input sources like:

  • Scala’s collections, i.e. sc.parallelize(0 to 100)

  • local or remote filesystems, i.e. sc.textFile("")

  • Any Hadoop InputSource using sc.newAPIHadoopFile

Unpersisting RDD (Marking RDD as Non-Persistent) — unpersist Method


unpersist removes an RDD from the master’s Block Manager (calls removeRdd(rddId: Int, blocking: Boolean)) and the internal persistentRdds mapping.

It finally posts SparkListenerUnpersistRDD message to listenerBus.

Setting Checkpoint Directory — setCheckpointDir Method

setCheckpointDir(directory: String)

setCheckpointDir method is used to set up the checkpoint directory…​FIXME


Registering Accumulator — register Methods

register(acc: AccumulatorV2[_, _]): Unit
register(acc: AccumulatorV2[_, _], name: String): Unit

register registers the acc accumulator. You can optionally give an accumulator a name.

You can create built-in accumulators for longs, doubles, and collection types using specialized methods.

Internally, register registers acc accumulator (with the current SparkContext).

Creating Built-In Accumulators

longAccumulator: LongAccumulator
longAccumulator(name: String): LongAccumulator
doubleAccumulator: DoubleAccumulator
doubleAccumulator(name: String): DoubleAccumulator
collectionAccumulator[T]: CollectionAccumulator[T]
collectionAccumulator[T](name: String): CollectionAccumulator[T]

You can use longAccumulator, doubleAccumulator or collectionAccumulator to create and register accumulators for simple and collection values.

longAccumulator returns LongAccumulator with the zero value 0.

doubleAccumulator returns DoubleAccumulator with the zero value 0.0.

collectionAccumulator returns CollectionAccumulator with the zero value java.util.List[T].

scala> val acc = sc.longAccumulator
acc: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: None, value: 0)

scala> val counter = sc.longAccumulator("counter")
counter: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 1, name: Some(counter), value: 0)

scala> counter.value
res0: Long = 0

scala> sc.parallelize(0 to 9).foreach(n => counter.add(n))

scala> counter.value
res3: Long = 45

The name input parameter allows you to give a name to an accumulator and have it displayed in Spark UI (under Stages tab for a given stage).

spark webui accumulators
Figure 3. Accumulators in the Spark UI
You can register custom accumulators using register methods.

Creating Broadcast Variable — broadcast Method

broadcast[T](value: T): Broadcast[T]

broadcast method creates a broadcast variable. It is a shared memory with value (as broadcast blocks) on the driver and later on all Spark executors.

val sc: SparkContext = ???
scala> val hello = sc.broadcast("hello")
hello: org.apache.spark.broadcast.Broadcast[String] = Broadcast(0)

Spark transfers the value to Spark executors once, and tasks can share it without incurring repetitive network transmissions when the broadcast variable is used multiple times.

sparkcontext broadcast executors
Figure 4. Broadcasting a value to executors
The current BroadcastManager is available using SparkEnv.broadcastManager attribute and is always BroadcastManager (with few internal configuration changes to reflect where it runs, i.e. inside the driver or executors).

You should see the following INFO message in the logs:

INFO SparkContext: Created broadcast [id] from [callSite]

If ContextCleaner is defined, the new broadcast variable is registered for cleanup.

Spark does not support broadcasting RDDs.

scala> sc.broadcast(sc.range(0, 10))
java.lang.IllegalArgumentException: requirement failed: Can not directly broadcast RDDs; instead, call collect() and broadcast the result.
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1392)
  ... 48 elided

Once created, the broadcast variable (and other blocks) are displayed per executor and the driver in web UI (under Executors tab).

spark broadcast webui executors rdd blocks
Figure 5. Broadcast Variables In web UI’s Executors Tab

Distribute JARs to workers

The jar you specify with SparkContext.addJar will be copied to all the worker nodes.

The configuration setting spark.jars is a comma-separated list of jar paths to be included in all tasks executed from this SparkContext. A path can either be a local file, a file in HDFS (or other Hadoop-supported filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.

scala> sc.addJar("build.sbt")
15/11/11 21:54:54 INFO SparkContext: Added JAR build.sbt at with timestamp 1447275294457
FIXME Why is HttpFileServer used for addJar?

SparkContext as Application-Wide Counter

SparkContext keeps track of:

Running Job Synchronously — runJob Methods

RDD actions run jobs using one of runJob methods.

runJob[T, U](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  resultHandler: (Int, U) => Unit): Unit
runJob[T, U](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int]): Array[U]
runJob[T, U](
  rdd: RDD[T],
  func: Iterator[T] => U,
  partitions: Seq[Int]): Array[U]
runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]
runJob[T, U](rdd: RDD[T], func: Iterator[T] => U): Array[U]
runJob[T, U](
  rdd: RDD[T],
  processPartition: (TaskContext, Iterator[T]) => U,
  resultHandler: (Int, U) => Unit)
runJob[T, U: ClassTag](
  rdd: RDD[T],
  processPartition: Iterator[T] => U,
  resultHandler: (Int, U) => Unit)

runJob executes a function on one or many partitions of a RDD (in a SparkContext space) to produce a collection of values per partition.

runJob can only work when a SparkContext is not stopped.

Internally, runJob first makes sure that the SparkContext is not stopped. If it is, you should see the following IllegalStateException exception in the logs:

java.lang.IllegalStateException: SparkContext has been shutdown
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1893)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
  ... 48 elided

You should see the following INFO message in the logs:

INFO SparkContext: Starting job: [callSite]

With spark.logLineage enabled (which is not by default), you should see the following INFO message with toDebugString (executed on rdd):

INFO SparkContext: RDD's recursive dependencies:

runJob requests DAGScheduler to run a job.

runJob just prepares input parameters for DAGScheduler to run a job.

After DAGScheduler is done and the job has finished, runJob stops ConsoleProgressBar and performs RDD checkpointing of rdd.

For some actions, e.g. first() and lookup(), there is no need to compute all the partitions of the RDD in a job. And Spark knows it.
// RDD to work with
val lines = sc.parallelize(Seq("hello world", "nice to see you"))

import org.apache.spark.TaskContext
scala> sc.runJob(lines, (t: TaskContext, i: Iterator[String]) => 1) (1)
res0: Array[Int] = Array(1, 1)  (2)
1 Run a job using runJob on lines RDD with a function that returns 1 for every partition (of lines RDD).
2 What can you say about the number of partitions of the lines RDD? Is your result res0 different than mine? Why?
Read TaskContext.

Running a job is essentially executing a func function on all or a subset of partitions in an rdd RDD and returning the result as an array (with elements being the results per partition).

spark runjob
Figure 6. Executing action

Stopping SparkContext — stop Method

stop(): Unit

stop stops the SparkContext.

Internally, stop enables stopped internal flag. If already stopped, you should see the following INFO message in the logs:

INFO SparkContext: SparkContext already stopped.

stop then does the following:

  1. Removes _shutdownHookRef from ShutdownHookManager

  2. Posts a SparkListenerApplicationEnd (to LiveListenerBus Event Bus)

  3. Stops web UI

  4. Requests MetricSystem to report metrics (from all registered sinks)

  5. Stops ContextCleaner

  6. Requests ExecutorAllocationManager to stop

  7. If LiveListenerBus was started, requests LiveListenerBus to stop

  8. Requests EventLoggingListener to stop

  9. Requests DAGScheduler to stop

  10. Requests RpcEnv to stop HeartbeatReceiver endpoint

  11. Requests ConsoleProgressBar to stop

  12. Clears the reference to TaskScheduler, i.e. _taskScheduler is null

  13. Requests SparkEnv to stop and clears SparkEnv

  14. Clears SPARK_YARN_MODE flag

  15. Clears an active SparkContext

Ultimately, you should see the following INFO message in the logs:

INFO SparkContext: Successfully stopped SparkContext

Registering SparkListener — addSparkListener Method

addSparkListener(listener: SparkListenerInterface): Unit

You can register a custom SparkListenerInterface using addSparkListener method

You can also register custom listeners using spark.extraListeners setting.

Custom SchedulerBackend, TaskScheduler and DAGScheduler

By default, SparkContext uses (private[spark] class) org.apache.spark.scheduler.DAGScheduler, but you can develop your own custom DAGScheduler implementation, and use (private[spark]) SparkContext.dagScheduler_=(ds: DAGScheduler) method to assign yours.

It is also applicable to SchedulerBackend and TaskScheduler using schedulerBackend_=(sb: SchedulerBackend) and taskScheduler_=(ts: TaskScheduler) methods, respectively.

FIXME Make it an advanced exercise.


When a Spark context starts, it triggers SparkListenerEnvironmentUpdate and SparkListenerApplicationStart messages.

Refer to the section SparkContext’s initialization.

Setting Default Logging Level — setLogLevel Method

setLogLevel(logLevel: String)

setLogLevel allows you to set the root logging level in a Spark application, e.g. Spark shell.

You can directly set the logging level using org.apache.log4j.LogManager.getLogger().


Closure Cleaning — clean Method

clean(f: F, checkSerializable: Boolean = true): F

Every time an action is called, Spark cleans up the closure, i.e. the body of the action, before it is serialized and sent over the wire to executors.

SparkContext comes with clean(f: F, checkSerializable: Boolean = true) method that does this. It in turn calls ClosureCleaner.clean method.

Not only does ClosureCleaner.clean method clean the closure, but also does it transitively, i.e. referenced closures are cleaned transitively.

A closure is considered serializable as long as it does not explicitly reference unserializable objects. It does so by traversing the hierarchy of enclosing closures and null out any references that are not actually used by the starting closure.

Enable DEBUG logging level for org.apache.spark.util.ClosureCleaner logger to see what happens inside the class.

Add the following line to conf/

Refer to Logging.

With DEBUG logging level you should see the following messages in the logs:

+++ Cleaning closure [func] ([func.getClass.getName]) +++
 + declared fields: [declaredFields.size]
+++ closure [func] ([func.getClass.getName]) is now cleaned +++

Serialization is verified using a new instance of Serializer (as closure Serializer). Refer to Serialization.

FIXME an example, please.

Hadoop Configuration

While a SparkContext is being created, so is a Hadoop configuration (as an instance of org.apache.hadoop.conf.Configuration that is available as _hadoopConfiguration).

If a SparkConf is provided it is used to build the configuration as described. Otherwise, the default Configuration object is returned.

If AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are both available, the following settings are set for the Hadoop configuration:

  • fs.s3.awsAccessKeyId, fs.s3n.awsAccessKeyId, fs.s3a.access.key are set to the value of AWS_ACCESS_KEY_ID

  • fs.s3.awsSecretAccessKey, fs.s3n.awsSecretAccessKey, and fs.s3a.secret.key are set to the value of AWS_SECRET_ACCESS_KEY

Every spark.hadoop. setting becomes a setting of the configuration with the prefix spark.hadoop. removed for the key.

The value of spark.buffer.size (default: 65536) is used as the value of io.file.buffer.size.

listenerBus — LiveListenerBus Event Bus

listenerBus is a LiveListenerBus object that acts as a mechanism to announce events to other services on the driver.

It is created and started when SparkContext starts and, since it is a single-JVM event bus, is exclusively used on the driver.
listenerBus is a private[spark] value in SparkContext.

Time when SparkContext was Created — startTime Property

startTime: Long

startTime is the time in milliseconds when SparkContext was created.

scala> sc.startTime
res0: Long = 1464425605653

Spark User — sparkUser Property

sparkUser: String

sparkUser is the user who started the SparkContext instance.

Submitting ShuffleDependency for Execution — submitMapStage Internal Method

submitMapStage[K, V, C](
  dependency: ShuffleDependency[K, V, C]): SimpleFutureAction[MapOutputStatistics]

submitMapStage submits the input ShuffleDependency to DAGScheduler for execution and returns a SimpleFutureAction.

Internally, submitMapStage calculates the call site first and submits it with localProperties.

Interestingly, submitMapStage is used exclusively when Spark SQL’s ShuffleExchange physical operator is executed.
submitMapStage seems related to Adaptive Query Planning / Adaptive Scheduling.

Calculating Call Site — getCallSite Method


Cancelling Job Group — cancelJobGroup Method

cancelJobGroup(groupId: String)

cancelJobGroup requests DAGScheduler to cancel a group of active Spark jobs.

cancelJobGroup is used exclusively when SparkExecuteStatementOperation does cancel.

Cancelling All Running and Scheduled Jobs — cancelAllJobs Method

cancelAllJobs is used when spark-shell is terminated (e.g. using Ctrl+C, so it can in turn terminate all active Spark jobs) or SparkSQLCLIDriver is terminated.

Setting Local Properties to Group Spark Jobs — setJobGroup Method

  groupId: String,
  description: String,
  interruptOnCancel: Boolean = false): Unit

setJobGroup sets local properties:

  • as groupId

  • spark.job.description as description

  • spark.job.interruptOnCancel as interruptOnCancel

setJobGroup is used when:

  • Spark Thrift Server’s SparkExecuteStatementOperation runs a query

  • Structured Streaming’s StreamExecution runs batches

cleaner Method

cleaner: Option[ContextCleaner]

cleaner is a private[spark] method to get the optional application-wide ContextCleaner.

Finding Preferred Locations (Placement Preferences) for RDD Partition — getPreferredLocs Method

getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation]
Preferred locations of a partition of a RDD are also called placement preferences or locality preferences.
getPreferredLocs is used in CoalescedRDDPartition, DefaultPartitionCoalescer and PartitionerAwareUnionRDD.

Registering RDD in persistentRdds Internal Registry — persistRDD Internal Method

persistRDD(rdd: RDD[_]): Unit

persistRDD registers rdd in persistentRdds internal registry.

persistRDD is used exclusively when RDD is persisted or locally checkpointed.

Getting Storage Status of Cached RDDs (as RDDInfos) — getRDDStorageInfo Methods

getRDDStorageInfo: Array[RDDInfo] (1)
getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo]  (2)
1 Part of Spark’s Developer API that uses <2> filtering no RDDs

getRDDStorageInfo takes all the RDDs (from persistentRdds registry) that match filter and creates a collection of RDDInfo instances.

getRDDStorageInfo then updates the RDDInfos with the current status of all BlockManagers (in a Spark application).

In the end, getRDDStorageInfo gives only the RDD that are cached (i.e. the sum of memory and disk sizes as well as the number of partitions cached are greater than 0).

getRDDStorageInfo is used when RDD is requested for RDD lineage graph.



Quoting the scaladoc of org.apache.spark.SparkContext:

Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one.

You can however control the behaviour using spark.driver.allowMultipleContexts flag.

It is disabled, i.e. false, by default.

If enabled (i.e. true), Spark prints the following WARN message to the logs:

WARN Multiple running SparkContexts detected in the same JVM!

If disabled (default), it will throw an SparkException exception:

Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:

When creating an instance of SparkContext, Spark marks the current thread as having it being created (very early in the instantiation process).

It’s not guaranteed that Spark will work properly with two or more SparkContexts. Consider the feature a work in progress.

Accessing AppStatusStore — statusStore Method

statusStore: AppStatusStore

statusStore gives the current AppStatusStore.

statusStore is used when:

  • ConsoleProgressBar is requested to refresh

  • Spark SQL’s SharedState is requested for a SQLAppStatusStore (as statusStore)

Requesting URL of web UI — uiWebUrl Method

uiWebUrl: Option[String]

uiWebUrl requests the SparkUI for webUrl.

maxNumConcurrentTasks Method

maxNumConcurrentTasks(): Int

maxNumConcurrentTasks simply requests the SchedulerBackend for the maximum number of tasks that can be launched concurrently.

maxNumConcurrentTasks is used exclusively when DAGScheduler is requested to checkBarrierStageWithNumSlots.

Creating SchedulerBackend and TaskScheduler — createTaskScheduler Internal Factory Method

  sc: SparkContext,
  master: String,
  deployMode: String): (SchedulerBackend, TaskScheduler)

createTaskScheduler creates the SchedulerBackend and the TaskScheduler for the given master URL and deployment mode.

sparkcontext createtaskscheduler
Figure 7. SparkContext creates Task Scheduler and Scheduler Backend

Internally, createTaskScheduler branches off per the given master URL (master URL) to select the requested implementations.

createTaskScheduler understands the following master URLs:

  • local - local mode with 1 thread only

  • local[n] or local[*] - local mode with n threads

  • local[n, m] or local[*, m] — local mode with n threads and m number of failures

  • spark://hostname:port for Spark Standalone

  • local-cluster[n, m, z] — local cluster with n workers, m cores per worker, and z memory per worker

  • any other URL is passed to getClusterManager to load an external cluster manager.


Environment Variables

Table 4. Environment Variables
Environment Variable Default Value Description



Amount of memory to allocate for a Spark executor in MB.

See Executor Memory.


The user who is running SparkContext. Available later as sparkUser.

Posting SparkListenerEnvironmentUpdate Event to LiveListenerBus — postEnvironmentUpdate Internal Method

postEnvironmentUpdate(): Unit


postEnvironmentUpdate is used when SparkContext is created, and requested to addFile and addJar.

addJar Method

addJar(path: String): Unit


addJar is used when…​FIXME