Inside Creating SparkContext¶
This document describes the internals of what happens when a new SparkContext is created.
import org.apache.spark.{SparkConf, SparkContext}
// 1. Create Spark configuration
val conf = new SparkConf()
.setAppName("SparkMe Application")
.setMaster("local[*]")
// 2. Create Spark context
val sc = new SparkContext(conf)
creationSite¶
creationSite: CallSite
SparkContext determines call site.
assertOnDriver¶
SparkContext...FIXME
markPartiallyConstructed¶
SparkContext...FIXME
startTime¶
startTime: Long
SparkContext records the current time (in ms).
stopped¶
stopped: AtomicBoolean
SparkContext initializes stopped flag to false.
Printing Out Spark Version¶
SparkContext prints out the following INFO message to the logs:
Running Spark version [SPARK_VERSION]
sparkUser¶
sparkUser: String
SparkContext determines Spark user.
SparkConf¶
_conf: SparkConf
SparkContext clones the SparkConf and requests it to validateSettings.
Enforcing Mandatory Configuration Properties¶
SparkContext asserts that spark.master and spark.app.name are defined (in the SparkConf).
A master URL must be set in your configuration
An application name must be set in your configuration
DriverLogger¶
_driverLogger: Option[DriverLogger]
SparkContext creates a DriverLogger.
ResourceInformation¶
_resources: Map[String, ResourceInformation]
SparkContext uses spark.driver.resourcesFile configuration property to discovery driver resources and prints out the following INFO message to the logs:
==============================================================
Resources for [componentName]:
[resources]
==============================================================
Submitted Application¶
SparkContext prints out the following INFO message to the logs (with the value of spark.app.name configuration property):
Submitted application: [appName]
Spark on YARN and spark.yarn.app.id¶
For Spark on YARN in cluster deploy mode], SparkContext checks whether spark.yarn.app.id configuration property is defined. SparkException is thrown if it does not exist.
Detected yarn cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.
Displaying Spark Configuration¶
With spark.logConf configuration property enabled, SparkContext prints out the following INFO message to the logs:
Spark configuration:
[conf.toDebugString]
Note
SparkConf.toDebugString is used very early in the initialization process and other settings configured afterwards are not included. Use SparkContext.getConf.toDebugString once SparkContext is initialized.
Setting Configuration Properties¶
- spark.driver.host to the current value of the property (to override the default)
- spark.driver.port to
0unless defined already - spark.executor.id to
driver
User-Defined Jar Files¶
_jars: Seq[String]
SparkContext sets the _jars to spark.jars configuration property.
User-Defined Files¶
_files: Seq[String]
SparkContext sets the _files to spark.files configuration property.
spark.eventLog.dir¶
_eventLogDir: Option[URI]
If spark-history-server:EventLoggingListener.md[event logging] is enabled, i.e. EventLoggingListener.md#spark_eventLog_enabled[spark.eventLog.enabled] flag is true, the internal field _eventLogDir is set to the value of EventLoggingListener.md#spark_eventLog_dir[spark.eventLog.dir] setting or the default value /tmp/spark-events.
spark.eventLog.compress¶
_eventLogCodec: Option[String]
Also, if spark-history-server:EventLoggingListener.md#spark_eventLog_compress[spark.eventLog.compress] is enabled (it is not by default), the short name of the CompressionCodec is assigned to _eventLogCodec. The config key is spark.io.compression.codec (default: lz4).
Creating LiveListenerBus¶
_listenerBus: LiveListenerBus
SparkContext creates a LiveListenerBus.
Creating AppStatusStore (and AppStatusSource)¶
_statusStore: AppStatusStore
SparkContext creates an in-memory store (with an optional AppStatusSource if enabled) and requests the LiveListenerBus to register the AppStatusListener with the status queue.
The AppStatusStore is available using the statusStore property of the SparkContext.
Creating SparkEnv¶
_env: SparkEnv
SparkContext creates a SparkEnv and requests SparkEnv to use the instance as the default SparkEnv.
spark.repl.class.uri¶
With spark.repl.class.outputDir configuration property defined, SparkContext sets spark.repl.class.uri configuration property to be...FIXME
Creating SparkStatusTracker¶
_statusTracker: SparkStatusTracker
SparkContext creates a SparkStatusTracker (with itself and the AppStatusStore).
Creating ConsoleProgressBar¶
_progressBar: Option[ConsoleProgressBar]
SparkContext creates a ConsoleProgressBar only when spark.ui.showConsoleProgress configuration property is enabled.
Creating SparkUI¶
_ui: Option[SparkUI]
SparkContext creates a SparkUI only when spark.ui.enabled configuration property is enabled.
SparkContext requests the SparkUI to bind.
Hadoop Configuration¶
_hadoopConfiguration: Configuration
SparkContext creates a new Hadoop Configuration.
Adding User-Defined Jar Files¶
If there are jars given through the SparkContext constructor, they are added using addJar.
Adding User-Defined Files¶
SparkContext adds the files in spark.files configuration property.
_executorMemory¶
_executorMemory: Int
SparkContext determines the amount of memory to allocate to each executor. It is the value of executor:Executor.md#spark.executor.memory[spark.executor.memory] setting, or SparkContext.md#environment-variables[SPARK_EXECUTOR_MEMORY] environment variable (or currently-deprecated SPARK_MEM), or defaults to 1024.
_executorMemory is later available as sc.executorMemory and used for LOCAL_CLUSTER_REGEX, SparkDeploySchedulerBackend, to set executorEnvs("SPARK_EXECUTOR_MEMORY"), MesosSchedulerBackend, CoarseMesosSchedulerBackend.
SPARK_PREPEND_CLASSES Environment Variable¶
The value of SPARK_PREPEND_CLASSES environment variable is included in executorEnvs.
For Mesos SchedulerBackend Only¶
The Mesos scheduler backend's configuration is included in executorEnvs, i.e. SparkContext.md#environment-variables[SPARK_EXECUTOR_MEMORY], _conf.getExecutorEnv, and SPARK_USER.
ShuffleDriverComponents¶
_shuffleDriverComponents: ShuffleDriverComponents
SparkContext...FIXME
Registering HeartbeatReceiver¶
SparkContext registers HeartbeatReceiver RPC endpoint.
PluginContainer¶
_plugins: Option[PluginContainer]
SparkContext creates a PluginContainer (with itself and the _resources).
Creating SchedulerBackend and TaskScheduler¶
SparkContext object is requested to SparkContext.md#createTaskScheduler[create the SchedulerBackend with the TaskScheduler] (for the given master URL) and the result becomes the internal _schedulerBackend and _taskScheduler.
scheduler:DAGScheduler.md#creating-instance[DAGScheduler is created] (as _dagScheduler).
Sending Blocking TaskSchedulerIsSet¶
SparkContext sends a blocking TaskSchedulerIsSet message to HeartbeatReceiver RPC endpoint (to inform that the TaskScheduler is now available).
ExecutorMetricsSource¶
SparkContext creates an ExecutorMetricsSource when the spark.metrics.executorMetricsSource.enabled is enabled.
Heartbeater¶
SparkContext creates a Heartbeater and starts it.
Starting TaskScheduler¶
SparkContext requests the TaskScheduler to start.
Setting Spark Application's and Execution Attempt's IDs¶
SparkContext sets the internal fields -- _applicationId and _applicationAttemptId -- (using applicationId and applicationAttemptId methods from the scheduler:TaskScheduler.md#contract[TaskScheduler Contract]).
NOTE: SparkContext requests TaskScheduler for the scheduler:TaskScheduler.md#applicationId[unique identifier of a Spark application] (that is currently only implemented by scheduler:TaskSchedulerImpl.md#applicationId[TaskSchedulerImpl] that uses SchedulerBackend to scheduler:SchedulerBackend.md#applicationId[request the identifier]).
NOTE: The unique identifier of a Spark application is used to initialize spark-webui-SparkUI.md#setAppId[SparkUI] and storage:BlockManager.md#initialize[BlockManager].
NOTE: _applicationAttemptId is used when SparkContext is requested for the SparkContext.md#applicationAttemptId[unique identifier of execution attempt of a Spark application] and when EventLoggingListener spark-history-server:EventLoggingListener.md#creating-instance[is created].
Setting spark.app.id Spark Property in SparkConf¶
SparkContext sets SparkConf.md#spark.app.id[spark.app.id] property to be the <<_applicationId, unique identifier of a Spark application>> and, if enabled, spark-webui-SparkUI.md#setAppId[passes it on to SparkUI].
spark.ui.proxyBase¶
Initializing SparkUI¶
SparkContext requests the SparkUI (if defined) to setAppId with the _applicationId.
Initializing BlockManager¶
The storage:BlockManager.md#initialize[BlockManager (for the driver) is initialized] (with _applicationId).
Starting MetricsSystem¶
SparkContext requests the MetricsSystem to start (with the value of thespark.metrics.staticSources.enabled configuration property).
Note
SparkContext starts the MetricsSystem after <MetricsSystem uses it to build unique identifiers fo metrics sources.
Attaching Servlet Handlers to web UI¶
SparkContext requests the MetricsSystem for servlet handlers and requests the SparkUI to attach them.
Starting EventLoggingListener (with Event Log Enabled)¶
_eventLogger: Option[EventLoggingListener]
With spark.eventLog.enabled configuration property enabled, SparkContext creates an EventLoggingListener and requests it to start.
SparkContext requests the LiveListenerBus to add the EventLoggingListener to eventLog event queue.
With spark.eventLog.enabled disabled, _eventLogger is None (undefined).
ContextCleaner¶
_cleaner: Option[ContextCleaner]
With spark.cleaner.referenceTracking configuration property enabled, SparkContext creates a ContextCleaner (with itself and the _shuffleDriverComponents).
SparkContext requests the ContextCleaner to start
ExecutorAllocationManager¶
_executorAllocationManager: Option[ExecutorAllocationManager]
SparkContext initializes _executorAllocationManager internal registry.
SparkContext creates an ExecutorAllocationManager when:
-
Dynamic Allocation of Executors is enabled (based on spark.dynamicAllocation.enabled configuration property and the master URL)
The ExecutorAllocationManager is requested to start.
Registering User-Defined SparkListeners¶
SparkContext registers user-defined listeners and starts SparkListenerEvent event delivery to the listeners.
postEnvironmentUpdate¶
postEnvironmentUpdate is called that posts SparkListener.md#SparkListenerEnvironmentUpdate[SparkListenerEnvironmentUpdate] message on scheduler:LiveListenerBus.md[] with information about Task Scheduler's scheduling mode, added jar and file paths, and other environmental details.
postApplicationStart¶
SparkListener.md#SparkListenerApplicationStart[SparkListenerApplicationStart] message is posted to scheduler:LiveListenerBus.md[] (using the internal postApplicationStart method).
postStartHook¶
TaskScheduler scheduler:TaskScheduler.md#postStartHook[is notified that SparkContext is almost fully initialized].
NOTE: scheduler:TaskScheduler.md#postStartHook[TaskScheduler.postStartHook] does nothing by default, but custom implementations offer more advanced features, i.e. TaskSchedulerImpl scheduler:TaskSchedulerImpl.md#postStartHook[blocks the current thread until SchedulerBackend is ready]. There is also YarnClusterScheduler for Spark on YARN in cluster deploy mode.
Registering Metrics Sources¶
SparkContext requests MetricsSystem to register metrics sources for the following services:
Adding Shutdown Hook¶
SparkContext adds a shutdown hook (using ShutdownHookManager.addShutdownHook()).
SparkContext prints out the following DEBUG message to the logs:
Adding shutdown hook
CAUTION: FIXME ShutdownHookManager.addShutdownHook()
Any non-fatal Exception leads to termination of the Spark context instance.
CAUTION: FIXME What does NonFatal represent in Scala?
CAUTION: FIXME Finish me
Initializing nextShuffleId and nextRddId Internal Counters¶
nextShuffleId and nextRddId start with 0.
CAUTION: FIXME Where are nextShuffleId and nextRddId used?
A new instance of Spark context is created and ready for operation.
Loading External Cluster Manager for URL (getClusterManager method)¶
getClusterManager(
url: String): Option[ExternalClusterManager]
getClusterManager loads scheduler:ExternalClusterManager.md[] that scheduler:ExternalClusterManager.md#canCreate[can handle the input url].
If there are two or more external cluster managers that could handle url, a SparkException is thrown:
Multiple Cluster Managers ([serviceLoaders]) registered for the url [url].
NOTE: getClusterManager uses Java's ++https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html#load-java.lang.Class-java.lang.ClassLoader-++[ServiceLoader.load] method.
NOTE: getClusterManager is used to find a cluster manager for a master URL when SparkContext.md#createTaskScheduler[creating a SchedulerBackend and a TaskScheduler for the driver].
setupAndStartListenerBus¶
setupAndStartListenerBus(): Unit
setupAndStartListenerBus is an internal method that reads configuration-properties.md#spark.extraListeners[spark.extraListeners] configuration property from the current SparkConf.md[SparkConf] to create and register SparkListenerInterface listeners.
It expects that the class name represents a SparkListenerInterface listener with one of the following constructors (in this order):
- a single-argument constructor that accepts SparkConf.md[SparkConf]
- a zero-argument constructor
setupAndStartListenerBus scheduler:LiveListenerBus.md#ListenerBus-addListener[registers every listener class].
You should see the following INFO message in the logs:
INFO Registered listener [className]
It scheduler:LiveListenerBus.md#start[starts LiveListenerBus] and records it in the internal _listenerBusStarted.
When no single-SparkConf or zero-argument constructor could be found for a class name in configuration-properties.md#spark.extraListeners[spark.extraListeners] configuration property, a SparkException is thrown with the message:
[className] did not have a zero-argument constructor or a single-argument constructor that accepts SparkConf. Note: if the class is defined inside of another Scala class, then its constructors may accept an implicit parameter that references the enclosing class; in this case, you must define the listener as a top-level class in order to prevent this extra parameter from breaking Spark's ability to find a valid constructor.
Any exception while registering a SparkListenerInterface listener stops the SparkContext and a SparkException is thrown and the source exception's message.
Exception when registering SparkListener
Tip
Set INFO logging level for org.apache.spark.SparkContext logger to see the extra listeners being registered.
Registered listener pl.japila.spark.CustomSparkListener