Skip to content

Inside Creating SparkContext

This document describes the internals of what happens when a new SparkContext is created.

import org.apache.spark.{SparkConf, SparkContext}

// 1. Create Spark configuration
val conf = new SparkConf()
  .setAppName("SparkMe Application")
  .setMaster("local[*]")

// 2. Create Spark context
val sc = new SparkContext(conf)

creationSite

creationSite: CallSite

SparkContext determines call site.

assertOnDriver

SparkContext...FIXME

markPartiallyConstructed

SparkContext...FIXME

startTime

startTime: Long

SparkContext records the current time (in ms).

stopped

stopped: AtomicBoolean

SparkContext initializes stopped flag to false.

Printing Out Spark Version

SparkContext prints out the following INFO message to the logs:

Running Spark version [SPARK_VERSION]

sparkUser

sparkUser: String

SparkContext determines Spark user.

SparkConf

_conf: SparkConf

SparkContext clones the SparkConf and requests it to validateSettings.

Enforcing Mandatory Configuration Properties

SparkContext asserts that spark.master and spark.app.name are defined (in the SparkConf).

A master URL must be set in your configuration
An application name must be set in your configuration

DriverLogger

_driverLogger: Option[DriverLogger]

SparkContext creates a DriverLogger.

ResourceInformation

_resources: Map[String, ResourceInformation]

SparkContext uses spark.driver.resourcesFile configuration property to discovery driver resources and prints out the following INFO message to the logs:

==============================================================
Resources for [componentName]:
[resources]
==============================================================

Submitted Application

SparkContext prints out the following INFO message to the logs (with the value of spark.app.name configuration property):

Submitted application: [appName]

Spark on YARN and spark.yarn.app.id

For Spark on YARN in cluster deploy mode], SparkContext checks whether spark.yarn.app.id configuration property is defined. SparkException is thrown if it does not exist.

Detected yarn cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.

Displaying Spark Configuration

With spark.logConf configuration property enabled, SparkContext prints out the following INFO message to the logs:

Spark configuration:
[conf.toDebugString]

Note

SparkConf.toDebugString is used very early in the initialization process and other settings configured afterwards are not included. Use SparkContext.getConf.toDebugString once SparkContext is initialized.

Setting Configuration Properties

User-Defined Jar Files

_jars: Seq[String]

SparkContext sets the _jars to spark.jars configuration property.

User-Defined Files

_files: Seq[String]

SparkContext sets the _files to spark.files configuration property.

spark.eventLog.dir

_eventLogDir: Option[URI]

If spark-history-server:EventLoggingListener.md[event logging] is enabled, i.e. EventLoggingListener.md#spark_eventLog_enabled[spark.eventLog.enabled] flag is true, the internal field _eventLogDir is set to the value of EventLoggingListener.md#spark_eventLog_dir[spark.eventLog.dir] setting or the default value /tmp/spark-events.

spark.eventLog.compress

_eventLogCodec: Option[String]

Also, if spark-history-server:EventLoggingListener.md#spark_eventLog_compress[spark.eventLog.compress] is enabled (it is not by default), the short name of the CompressionCodec is assigned to _eventLogCodec. The config key is spark.io.compression.codec (default: lz4).

Creating LiveListenerBus

_listenerBus: LiveListenerBus

SparkContext creates a LiveListenerBus.

Creating AppStatusStore (and AppStatusSource)

_statusStore: AppStatusStore

SparkContext creates an in-memory store (with an optional AppStatusSource if enabled) and requests the LiveListenerBus to register the AppStatusListener with the status queue.

The AppStatusStore is available using the statusStore property of the SparkContext.

Creating SparkEnv

_env: SparkEnv

SparkContext creates a SparkEnv and requests SparkEnv to use the instance as the default SparkEnv.

spark.repl.class.uri

With spark.repl.class.outputDir configuration property defined, SparkContext sets spark.repl.class.uri configuration property to be...FIXME

Creating SparkStatusTracker

_statusTracker: SparkStatusTracker

SparkContext creates a SparkStatusTracker (with itself and the AppStatusStore).

Creating ConsoleProgressBar

_progressBar: Option[ConsoleProgressBar]

SparkContext creates a ConsoleProgressBar only when spark.ui.showConsoleProgress configuration property is enabled.

Creating SparkUI

_ui: Option[SparkUI]

SparkContext creates a SparkUI only when spark.ui.enabled configuration property is enabled.

SparkContext requests the SparkUI to bind.

Hadoop Configuration

_hadoopConfiguration: Configuration

SparkContext creates a new Hadoop Configuration.

Adding User-Defined Jar Files

If there are jars given through the SparkContext constructor, they are added using addJar.

Adding User-Defined Files

SparkContext adds the files in spark.files configuration property.

_executorMemory

_executorMemory: Int

SparkContext determines the amount of memory to allocate to each executor. It is the value of executor:Executor.md#spark.executor.memory[spark.executor.memory] setting, or SparkContext.md#environment-variables[SPARK_EXECUTOR_MEMORY] environment variable (or currently-deprecated SPARK_MEM), or defaults to 1024.

_executorMemory is later available as sc.executorMemory and used for LOCAL_CLUSTER_REGEX, SparkDeploySchedulerBackend, to set executorEnvs("SPARK_EXECUTOR_MEMORY"), MesosSchedulerBackend, CoarseMesosSchedulerBackend.

SPARK_PREPEND_CLASSES Environment Variable

The value of SPARK_PREPEND_CLASSES environment variable is included in executorEnvs.

For Mesos SchedulerBackend Only

The Mesos scheduler backend's configuration is included in executorEnvs, i.e. SparkContext.md#environment-variables[SPARK_EXECUTOR_MEMORY], _conf.getExecutorEnv, and SPARK_USER.

ShuffleDriverComponents

_shuffleDriverComponents: ShuffleDriverComponents

SparkContext...FIXME

Registering HeartbeatReceiver

SparkContext registers HeartbeatReceiver RPC endpoint.

PluginContainer

_plugins: Option[PluginContainer]

SparkContext creates a PluginContainer (with itself and the _resources).

Creating SchedulerBackend and TaskScheduler

SparkContext object is requested to SparkContext.md#createTaskScheduler[create the SchedulerBackend with the TaskScheduler] (for the given master URL) and the result becomes the internal _schedulerBackend and _taskScheduler.

scheduler:DAGScheduler.md#creating-instance[DAGScheduler is created] (as _dagScheduler).

Sending Blocking TaskSchedulerIsSet

SparkContext sends a blocking TaskSchedulerIsSet message to HeartbeatReceiver RPC endpoint (to inform that the TaskScheduler is now available).

ExecutorMetricsSource

SparkContext creates an ExecutorMetricsSource when the spark.metrics.executorMetricsSource.enabled is enabled.

Heartbeater

SparkContext creates a Heartbeater and starts it.

Starting TaskScheduler

SparkContext requests the TaskScheduler to start.

Setting Spark Application's and Execution Attempt's IDs

SparkContext sets the internal fields -- _applicationId and _applicationAttemptId -- (using applicationId and applicationAttemptId methods from the scheduler:TaskScheduler.md#contract[TaskScheduler Contract]).

NOTE: SparkContext requests TaskScheduler for the scheduler:TaskScheduler.md#applicationId[unique identifier of a Spark application] (that is currently only implemented by scheduler:TaskSchedulerImpl.md#applicationId[TaskSchedulerImpl] that uses SchedulerBackend to scheduler:SchedulerBackend.md#applicationId[request the identifier]).

NOTE: The unique identifier of a Spark application is used to initialize spark-webui-SparkUI.md#setAppId[SparkUI] and storage:BlockManager.md#initialize[BlockManager].

NOTE: _applicationAttemptId is used when SparkContext is requested for the SparkContext.md#applicationAttemptId[unique identifier of execution attempt of a Spark application] and when EventLoggingListener spark-history-server:EventLoggingListener.md#creating-instance[is created].

Setting spark.app.id Spark Property in SparkConf

SparkContext sets SparkConf.md#spark.app.id[spark.app.id] property to be the <<_applicationId, unique identifier of a Spark application>> and, if enabled, spark-webui-SparkUI.md#setAppId[passes it on to SparkUI].

spark.ui.proxyBase

Initializing SparkUI

SparkContext requests the SparkUI (if defined) to setAppId with the _applicationId.

Initializing BlockManager

The storage:BlockManager.md#initialize[BlockManager (for the driver) is initialized] (with _applicationId).

Starting MetricsSystem

SparkContext requests the MetricsSystem to start (with the value of thespark.metrics.staticSources.enabled configuration property).

Note

SparkContext starts the MetricsSystem after <> as MetricsSystem uses it to build unique identifiers fo metrics sources.

Attaching Servlet Handlers to web UI

SparkContext requests the MetricsSystem for servlet handlers and requests the SparkUI to attach them.

Starting EventLoggingListener (with Event Log Enabled)

_eventLogger: Option[EventLoggingListener]

With spark.eventLog.enabled configuration property enabled, SparkContext creates an EventLoggingListener and requests it to start.

SparkContext requests the LiveListenerBus to add the EventLoggingListener to eventLog event queue.

With spark.eventLog.enabled disabled, _eventLogger is None (undefined).

ContextCleaner

_cleaner: Option[ContextCleaner]

With spark.cleaner.referenceTracking configuration property enabled, SparkContext creates a ContextCleaner (with itself and the _shuffleDriverComponents).

SparkContext requests the ContextCleaner to start

ExecutorAllocationManager

_executorAllocationManager: Option[ExecutorAllocationManager]

SparkContext initializes _executorAllocationManager internal registry.

SparkContext creates an ExecutorAllocationManager when:

The ExecutorAllocationManager is requested to start.

Registering User-Defined SparkListeners

SparkContext registers user-defined listeners and starts SparkListenerEvent event delivery to the listeners.

postEnvironmentUpdate

postEnvironmentUpdate is called that posts SparkListener.md#SparkListenerEnvironmentUpdate[SparkListenerEnvironmentUpdate] message on scheduler:LiveListenerBus.md[] with information about Task Scheduler's scheduling mode, added jar and file paths, and other environmental details.

postApplicationStart

SparkListener.md#SparkListenerApplicationStart[SparkListenerApplicationStart] message is posted to scheduler:LiveListenerBus.md[] (using the internal postApplicationStart method).

postStartHook

TaskScheduler scheduler:TaskScheduler.md#postStartHook[is notified that SparkContext is almost fully initialized].

NOTE: scheduler:TaskScheduler.md#postStartHook[TaskScheduler.postStartHook] does nothing by default, but custom implementations offer more advanced features, i.e. TaskSchedulerImpl scheduler:TaskSchedulerImpl.md#postStartHook[blocks the current thread until SchedulerBackend is ready]. There is also YarnClusterScheduler for Spark on YARN in cluster deploy mode.

Registering Metrics Sources

SparkContext requests MetricsSystem to register metrics sources for the following services:

Adding Shutdown Hook

SparkContext adds a shutdown hook (using ShutdownHookManager.addShutdownHook()).

SparkContext prints out the following DEBUG message to the logs:

Adding shutdown hook

CAUTION: FIXME ShutdownHookManager.addShutdownHook()

Any non-fatal Exception leads to termination of the Spark context instance.

CAUTION: FIXME What does NonFatal represent in Scala?

CAUTION: FIXME Finish me

Initializing nextShuffleId and nextRddId Internal Counters

nextShuffleId and nextRddId start with 0.

CAUTION: FIXME Where are nextShuffleId and nextRddId used?

A new instance of Spark context is created and ready for operation.

Loading External Cluster Manager for URL (getClusterManager method)

getClusterManager(
  url: String): Option[ExternalClusterManager]

getClusterManager loads scheduler:ExternalClusterManager.md[] that scheduler:ExternalClusterManager.md#canCreate[can handle the input url].

If there are two or more external cluster managers that could handle url, a SparkException is thrown:

Multiple Cluster Managers ([serviceLoaders]) registered for the url [url].

NOTE: getClusterManager uses Java's ++https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html#load-java.lang.Class-java.lang.ClassLoader-++[ServiceLoader.load] method.

NOTE: getClusterManager is used to find a cluster manager for a master URL when SparkContext.md#createTaskScheduler[creating a SchedulerBackend and a TaskScheduler for the driver].

setupAndStartListenerBus

setupAndStartListenerBus(): Unit

setupAndStartListenerBus is an internal method that reads configuration-properties.md#spark.extraListeners[spark.extraListeners] configuration property from the current SparkConf.md[SparkConf] to create and register SparkListenerInterface listeners.

It expects that the class name represents a SparkListenerInterface listener with one of the following constructors (in this order):

  • a single-argument constructor that accepts SparkConf.md[SparkConf]
  • a zero-argument constructor

setupAndStartListenerBus scheduler:LiveListenerBus.md#ListenerBus-addListener[registers every listener class].

You should see the following INFO message in the logs:

INFO Registered listener [className]

It scheduler:LiveListenerBus.md#start[starts LiveListenerBus] and records it in the internal _listenerBusStarted.

When no single-SparkConf or zero-argument constructor could be found for a class name in configuration-properties.md#spark.extraListeners[spark.extraListeners] configuration property, a SparkException is thrown with the message:

[className] did not have a zero-argument constructor or a single-argument constructor that accepts SparkConf. Note: if the class is defined inside of another Scala class, then its constructors may accept an implicit parameter that references the enclosing class; in this case, you must define the listener as a top-level class in order to prevent this extra parameter from breaking Spark's ability to find a valid constructor.

Any exception while registering a SparkListenerInterface listener stops the SparkContext and a SparkException is thrown and the source exception's message.

Exception when registering SparkListener

Tip

Set INFO logging level for org.apache.spark.SparkContext logger to see the extra listeners being registered.

Registered listener pl.japila.spark.CustomSparkListener