Skip to content


SparkContext is the entry point to all of the components of Apache Spark (execution engine) and so the heart of a Spark application. In fact, you can consider an application a Spark application only when it uses a SparkContext (directly or indirectly).

Spark context acts as the master of your Spark application


There should be one active SparkContext per JVM and Spark developers should use SparkContext.getOrCreate utility for sharing it (e.g. across threads).

Creating Instance

SparkContext takes the following to be created:

SparkContext is created (directly or indirectly using getOrCreate utility).

While being created, SparkContext sets up core services and establishes a connection to a cluster manager.

Checkpoint Directory

SparkContext defines checkpointDir internal registry for the path to a checkpoint directory.

checkpointDir is undefined (None) when SparkContext is created and is set using setCheckpointDir.

checkpointDir is required for Reliable Checkpointing.

checkpointDir is available using getCheckpointDir.


getCheckpointDir: Option[String]

getCheckpointDir returns the checkpointDir.

getCheckpointDir is used when:

Submitting MapStage for Execution

submitMapStage[K, V, C](
  dependency: ShuffleDependency[K, V, C]): SimpleFutureAction[MapOutputStatistics]

submitMapStage requests the DAGScheduler to submit the given ShuffleDependency for execution (that eventually produces a MapOutputStatistics).

submitMapStage is used when:

  • ShuffleExchangeExec (Spark SQL) unary physical operator is executed


SparkContext creates an ExecutorMetricsSource when created with spark.metrics.executorMetricsSource.enabled enabled.

SparkContext requests the ExecutorMetricsSource to register with the MetricsSystem.

SparkContext uses the ExecutorMetricsSource to create the Heartbeater.



SparkContext creates a ResourceProfileManager when created.


resourceProfileManager: ResourceProfileManager

resourceProfileManager returns the ResourceProfileManager.

resourceProfileManager is used when:


SparkContext can create a DriverLogger when created.

SparkContext requests the DriverLogger to startSync in postApplicationStart.


SparkContext can create an AppStatusSource when created (based on the spark.metrics.appStatusSource.enabled configuration property).

SparkContext uses the AppStatusSource to create the AppStatusStore.

If configured, SparkContext registers the AppStatusSource with the MetricsSystem.


SparkContext creates an AppStatusStore when created (with itself and the AppStatusSource).

SparkContext requests AppStatusStore for the AppStatusListener and requests the LiveListenerBus to add it to the application status queue.

SparkContext uses the AppStatusStore to create the following:

AppStatusStore is requested to status/ in stop.


statusStore: AppStatusStore

statusStore returns the AppStatusStore.

statusStore is used when:

  • SparkContext is requested to getRDDStorageInfo
  • ConsoleProgressBar is requested to refresh
  • HiveThriftServer2 is requested to createListenerAndUI
  • SharedState (Spark SQL) is requested for a SQLAppStatusStore and a StreamingQueryStatusListener


SparkContext creates a SparkStatusTracker when created (with itself and the AppStatusStore).


statusTracker: SparkStatusTracker

statusTracker returns the SparkStatusTracker.

Local Properties

localProperties: InheritableThreadLocal[Properties]

SparkContext uses an InheritableThreadLocal (Java) of key-value pairs of thread-local properties to pass extra information from a parent thread (on the driver) to child threads.

localProperties is meant to be used by developers using SparkContext.setLocalProperty and SparkContext.getLocalProperty.

Local Properties are available using TaskContext.getLocalProperty.

Local Properties are available to SparkListeners using the following events:

localProperties are passed down when SparkContext is requested for the following:

DAGScheduler passes down local properties when scheduling:

Spark (Core) defines the following local properties.

Name Default Value Setter
callSite.short SparkContext.setCallSite
spark.job.description callSite.short SparkContext.setJobDescription
spark.job.interruptOnCancel SparkContext.setJobGroup SparkContext.setJobGroup


SparkContext creates a ShuffleDriverComponents when created.

SparkContext loads the ShuffleDataIO that is in turn requested for the ShuffleDriverComponents. SparkContext requests the ShuffleDriverComponents to initialize.

The ShuffleDriverComponents is used when:

SparkContext requests the ShuffleDriverComponents to clean up when stopping.

Static Files


  path: String,
  recursive: Boolean): Unit
// recursive = false
  path: String): Unit

Firstly, addFile validate the schema of given path. For a no-schema path, addFile converts it to a canonical form. For a local schema path, addFile prints out the following WARN message to the logs and exits.

File with 'local' scheme is not supported to add to file server, since it is already available on every node.
And for other schema path, addFile creates a Hadoop Path from the given path.

addFile Will validate the URL if the path is an HTTP, HTTPS or FTP URI.

addFile Will throw SparkException with below message if path is local directories but not in local mode.

addFile does not support local directories when not running local mode.

addFile Will throw SparkException with below message if path is directories but not turn on recursive flag.

Added file $hadoopPath is a directory and recursive is not turned on.

In the end, addFile adds the file to the addedFiles internal registry (with the current timestamp):

  • For new files, addFile prints out the following INFO message to the logs, fetches the file (to the root directory and without using the cache) and postEnvironmentUpdate.

    Added file [path] at [key] with timestamp [timestamp]
  • For files that were already added, addFile prints out the following WARN message to the logs:

    The path [path] has been added already. Overwriting of added paths is not supported in the current version.

addFile is used when:


listFiles(): Seq[String]

listFiles is the files added.

addedFiles Internal Registry

addedFiles: Map[String, Long]

addedFiles is a collection of static files by the timestamp the were added at.

addedFiles is used when:


files: Seq[String]

files is a collection of file paths defined by spark.files configuration property.

Posting SparkListenerEnvironmentUpdate Event

postEnvironmentUpdate(): Unit


postEnvironmentUpdate is used when:

getOrCreate Utility

getOrCreate(): SparkContext
  config: SparkConf): SparkContext



SparkContext creates a PluginContainer when created.

PluginContainer is created (for the driver where SparkContext lives) using PluginContainer.apply utility.

PluginContainer is then requested to registerMetrics with the applicationId.

PluginContainer is requested to shutdown when SparkContext is requested to stop.

Creating SchedulerBackend and TaskScheduler

  sc: SparkContext,
  master: String,
  deployMode: String): (SchedulerBackend, TaskScheduler)

createTaskScheduler creates a SchedulerBackend and a TaskScheduler for the given master URL and deployment mode.

SparkContext creates Task Scheduler and Scheduler Backend

Internally, createTaskScheduler branches off per the given master URL to select the requested implementations.

createTaskScheduler accepts the following master URLs:

  • local - local mode with 1 thread only
  • local[n] or local[*] - local mode with n threads
  • local[n, m] or local[*, m] -- local mode with n threads and m number of failures
  • spark://hostname:port for Spark Standalone
  • local-cluster[n, m, z] -- local cluster with n workers, m cores per worker, and z memory per worker
  • Other URLs are simply handed over to getClusterManager to load an external cluster manager if available

createTaskScheduler is used when SparkContext is created.

Loading ExternalClusterManager

  url: String): Option[ExternalClusterManager]

getClusterManager uses Java's ServiceLoader to find and load an ExternalClusterManager that supports the given master URL.

ExternalClusterManager Service Discovery

For ServiceLoader to find ExternalClusterManagers, they have to be registered using the following file:


getClusterManager throws a SparkException when multiple cluster managers were found:

Multiple external cluster managers registered for the url [url]: [serviceLoaders]

getClusterManager is used when SparkContext is requested for a SchedulerBackend and TaskScheduler.

Running Job (Synchronously)

runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U): Array[U]
runJob[T, U: ClassTag](
  rdd: RDD[T],
  processPartition: (TaskContext, Iterator[T]) => U,
  resultHandler: (Int, U) => Unit): Unit
runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int]): Array[U]
runJob[T, U: ClassTag]( // (1)!
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  resultHandler: (Int, U) => Unit): Unit
runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: Iterator[T] => U): Array[U]
runJob[T, U: ClassTag](
  rdd: RDD[T],
  processPartition: Iterator[T] => U,
  resultHandler: (Int, U) => Unit): Unit
runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: Iterator[T] => U,
  partitions: Seq[Int]): Array[U]
  1. Requests the DAGScheduler to run a job

Executing action

runJob determines the call site and cleans up the given func function.

runJob prints out the following INFO message to the logs:

Starting job: [callSite]

With spark.logLineage enabled, runJob requests the given RDD for the recursive dependencies and prints out the following INFO message to the logs:

RDD's recursive dependencies:

runJob requests the DAGScheduler to run a job with the following:


runJob is blocked until the job has finished (regardless of the result, successful or not).

runJob requests the ConsoleProgressBar (if available) to finishAll.

In the end, runJob requests the given RDD to doCheckpoint.


runJob is essentially executing a func function on all or a subset of partitions of an RDD and returning the result as an array (with elements being the results per partition).

sc.setLocalProperty("callSite.short", "runJob Demo")

val partitionsNumber = 4
val rdd = sc.parallelize(
  Seq("hello world", "nice to see you"),
  numSlices = partitionsNumber)

import org.apache.spark.TaskContext
val func = (t: TaskContext, ss: Iterator[String]) => 1
val result = sc.runJob(rdd, func)
assert(result.length == partitionsNumber)


Call Site

getCallSite(): CallSite


getCallSite is used when:

Closure Cleaning

  f: F,
  checkSerializable: Boolean = true): F

clean cleans up the given f closure (using ClosureCleaner.clean utility).


Enable DEBUG logging level for org.apache.spark.util.ClosureCleaner logger to see what happens inside the class.

Add the following line to conf/

Refer to Logging.

With DEBUG logging level you should see the following messages in the logs:

+++ Cleaning closure [func] ([func.getClass.getName]) +++
 + declared fields: [declaredFields.size]
+++ closure [func] ([func.getClass.getName]) is now cleaned +++

Maximum Number of Concurrent Tasks

  rp: ResourceProfile): Int

maxNumConcurrentTasks requests the SchedulerBackend for the maximum number of tasks that can be launched concurrently (with the given ResourceProfile).

maxNumConcurrentTasks is used when:


  body: => U): U

withScope withScope with this SparkContext.


withScope is used for most (if not all) SparkContext API operators.

Finding Preferred Locations for RDD Partition

  rdd: RDD[_],
  partition: Int): Seq[TaskLocation]

getPreferredLocs requests the DAGScheduler for the preferred locations of the given partition (of the given RDD).


Preferred locations of a RDD partition are also referred to as placement preferences or locality preferences.

getPreferredLocs is used when:

  • CoalescedRDDPartition is requested to localFraction
  • DefaultPartitionCoalescer is requested to currPrefLocs
  • PartitionerAwareUnionRDD is requested to currPrefLocs


Enable ALL logging level for org.apache.spark.SparkContext logger to see what happens inside.

Add the following line to conf/ = org.apache.spark.SparkContext
logger.SparkContext.level = all

Refer to Logging.