Inside Creating SparkContext¶
This document describes the internals of what happens when a new SparkContext
is created.
import org.apache.spark.{SparkConf, SparkContext}
// 1. Create Spark configuration
val conf = new SparkConf()
.setAppName("SparkMe Application")
.setMaster("local[*]")
// 2. Create Spark context
val sc = new SparkContext(conf)
creationSite¶
creationSite: CallSite
SparkContext
determines call site.
assertOnDriver¶
SparkContext
...FIXME
markPartiallyConstructed¶
SparkContext
...FIXME
startTime¶
startTime: Long
SparkContext
records the current time (in ms).
stopped¶
stopped: AtomicBoolean
SparkContext
initializes stopped
flag to false
.
Printing Out Spark Version¶
SparkContext
prints out the following INFO message to the logs:
Running Spark version [SPARK_VERSION]
sparkUser¶
sparkUser: String
SparkContext
determines Spark user.
SparkConf¶
_conf: SparkConf
SparkContext
clones the SparkConf and requests it to validateSettings.
Enforcing Mandatory Configuration Properties¶
SparkContext
asserts that spark.master and spark.app.name are defined (in the SparkConf).
A master URL must be set in your configuration
An application name must be set in your configuration
DriverLogger¶
_driverLogger: Option[DriverLogger]
SparkContext
creates a DriverLogger
.
ResourceInformation¶
_resources: Map[String, ResourceInformation]
SparkContext
uses spark.driver.resourcesFile configuration property to discovery driver resources and prints out the following INFO message to the logs:
==============================================================
Resources for [componentName]:
[resources]
==============================================================
Submitted Application¶
SparkContext
prints out the following INFO message to the logs (with the value of spark.app.name configuration property):
Submitted application: [appName]
Spark on YARN and spark.yarn.app.id¶
For Spark on YARN in cluster deploy mode], SparkContext
checks whether spark.yarn.app.id
configuration property is defined. SparkException
is thrown if it does not exist.
Detected yarn cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.
Displaying Spark Configuration¶
With spark.logConf configuration property enabled, SparkContext
prints out the following INFO message to the logs:
Spark configuration:
[conf.toDebugString]
Note
SparkConf.toDebugString
is used very early in the initialization process and other settings configured afterwards are not included. Use SparkContext.getConf.toDebugString
once SparkContext
is initialized.
Setting Configuration Properties¶
- spark.driver.host to the current value of the property (to override the default)
- spark.driver.port to
0
unless defined already - spark.executor.id to
driver
User-Defined Jar Files¶
_jars: Seq[String]
SparkContext
sets the _jars
to spark.jars configuration property.
User-Defined Files¶
_files: Seq[String]
SparkContext
sets the _files
to spark.files configuration property.
spark.eventLog.dir¶
_eventLogDir: Option[URI]
If spark-history-server:EventLoggingListener.md[event logging] is enabled, i.e. EventLoggingListener.md#spark_eventLog_enabled[spark.eventLog.enabled] flag is true
, the internal field _eventLogDir
is set to the value of EventLoggingListener.md#spark_eventLog_dir[spark.eventLog.dir] setting or the default value /tmp/spark-events
.
spark.eventLog.compress¶
_eventLogCodec: Option[String]
Also, if spark-history-server:EventLoggingListener.md#spark_eventLog_compress[spark.eventLog.compress] is enabled (it is not by default), the short name of the CompressionCodec
is assigned to _eventLogCodec
. The config key is spark.io.compression.codec (default: lz4
).
Creating LiveListenerBus¶
_listenerBus: LiveListenerBus
SparkContext
creates a LiveListenerBus.
Creating AppStatusStore (and AppStatusSource)¶
_statusStore: AppStatusStore
SparkContext
creates an in-memory store (with an optional AppStatusSource if enabled) and requests the LiveListenerBus to register the AppStatusListener with the status queue.
The AppStatusStore
is available using the statusStore property of the SparkContext
.
Creating SparkEnv¶
_env: SparkEnv
SparkContext
creates a SparkEnv and requests SparkEnv
to use the instance as the default SparkEnv.
spark.repl.class.uri¶
With spark.repl.class.outputDir configuration property defined, SparkContext
sets spark.repl.class.uri configuration property to be...FIXME
Creating SparkStatusTracker¶
_statusTracker: SparkStatusTracker
SparkContext
creates a SparkStatusTracker (with itself and the AppStatusStore).
Creating ConsoleProgressBar¶
_progressBar: Option[ConsoleProgressBar]
SparkContext
creates a ConsoleProgressBar only when spark.ui.showConsoleProgress configuration property is enabled.
Creating SparkUI¶
_ui: Option[SparkUI]
SparkContext
creates a SparkUI only when spark.ui.enabled configuration property is enabled.
SparkContext
requests the SparkUI
to bind.
Hadoop Configuration¶
_hadoopConfiguration: Configuration
SparkContext
creates a new Hadoop Configuration
.
Adding User-Defined Jar Files¶
If there are jars given through the SparkContext
constructor, they are added using addJar
.
Adding User-Defined Files¶
SparkContext
adds the files in spark.files configuration property.
_executorMemory¶
_executorMemory: Int
SparkContext
determines the amount of memory to allocate to each executor. It is the value of executor:Executor.md#spark.executor.memory[spark.executor.memory] setting, or SparkContext.md#environment-variables[SPARK_EXECUTOR_MEMORY] environment variable (or currently-deprecated SPARK_MEM
), or defaults to 1024
.
_executorMemory
is later available as sc.executorMemory
and used for LOCAL_CLUSTER_REGEX, SparkDeploySchedulerBackend
, to set executorEnvs("SPARK_EXECUTOR_MEMORY")
, MesosSchedulerBackend, CoarseMesosSchedulerBackend.
SPARK_PREPEND_CLASSES Environment Variable¶
The value of SPARK_PREPEND_CLASSES
environment variable is included in executorEnvs
.
For Mesos SchedulerBackend Only¶
The Mesos scheduler backend's configuration is included in executorEnvs
, i.e. SparkContext.md#environment-variables[SPARK_EXECUTOR_MEMORY], _conf.getExecutorEnv
, and SPARK_USER
.
ShuffleDriverComponents¶
_shuffleDriverComponents: ShuffleDriverComponents
SparkContext
...FIXME
Registering HeartbeatReceiver¶
SparkContext
registers HeartbeatReceiver RPC endpoint.
PluginContainer¶
_plugins: Option[PluginContainer]
SparkContext
creates a PluginContainer (with itself and the _resources).
Creating SchedulerBackend and TaskScheduler¶
SparkContext
object is requested to SparkContext.md#createTaskScheduler[create the SchedulerBackend with the TaskScheduler] (for the given master URL) and the result becomes the internal _schedulerBackend
and _taskScheduler
.
scheduler:DAGScheduler.md#creating-instance[DAGScheduler is created] (as _dagScheduler
).
Sending Blocking TaskSchedulerIsSet¶
SparkContext
sends a blocking TaskSchedulerIsSet
message to HeartbeatReceiver RPC endpoint (to inform that the TaskScheduler
is now available).
ExecutorMetricsSource¶
SparkContext
creates an ExecutorMetricsSource when the spark.metrics.executorMetricsSource.enabled is enabled.
Heartbeater¶
SparkContext
creates a Heartbeater
and starts it.
Starting TaskScheduler¶
SparkContext
requests the TaskScheduler to start.
Setting Spark Application's and Execution Attempt's IDs¶
SparkContext
sets the internal fields -- _applicationId
and _applicationAttemptId
-- (using applicationId
and applicationAttemptId
methods from the scheduler:TaskScheduler.md#contract[TaskScheduler Contract]).
NOTE: SparkContext
requests TaskScheduler
for the scheduler:TaskScheduler.md#applicationId[unique identifier of a Spark application] (that is currently only implemented by scheduler:TaskSchedulerImpl.md#applicationId[TaskSchedulerImpl] that uses SchedulerBackend
to scheduler:SchedulerBackend.md#applicationId[request the identifier]).
NOTE: The unique identifier of a Spark application is used to initialize spark-webui-SparkUI.md#setAppId[SparkUI] and storage:BlockManager.md#initialize[BlockManager].
NOTE: _applicationAttemptId
is used when SparkContext
is requested for the SparkContext.md#applicationAttemptId[unique identifier of execution attempt of a Spark application] and when EventLoggingListener
spark-history-server:EventLoggingListener.md#creating-instance[is created].
Setting spark.app.id Spark Property in SparkConf¶
SparkContext
sets SparkConf.md#spark.app.id[spark.app.id] property to be the <<_applicationId, unique identifier of a Spark application>> and, if enabled, spark-webui-SparkUI.md#setAppId[passes it on to SparkUI
].
spark.ui.proxyBase¶
Initializing SparkUI¶
SparkContext
requests the SparkUI (if defined) to setAppId with the _applicationId.
Initializing BlockManager¶
The storage:BlockManager.md#initialize[BlockManager (for the driver) is initialized] (with _applicationId
).
Starting MetricsSystem¶
SparkContext
requests the MetricsSystem
to start (with the value of thespark.metrics.staticSources.enabled configuration property).
Note
SparkContext
starts the MetricsSystem
after <MetricsSystem
uses it to build unique identifiers fo metrics sources.
Attaching Servlet Handlers to web UI¶
SparkContext
requests the MetricsSystem
for servlet handlers and requests the SparkUI to attach them.
Starting EventLoggingListener (with Event Log Enabled)¶
_eventLogger: Option[EventLoggingListener]
With spark.eventLog.enabled configuration property enabled, SparkContext
creates an EventLoggingListener and requests it to start.
SparkContext
requests the LiveListenerBus to add the EventLoggingListener
to eventLog
event queue.
With spark.eventLog.enabled
disabled, _eventLogger
is None
(undefined).
ContextCleaner¶
_cleaner: Option[ContextCleaner]
With spark.cleaner.referenceTracking configuration property enabled, SparkContext
creates a ContextCleaner (with itself and the _shuffleDriverComponents).
SparkContext
requests the ContextCleaner
to start
ExecutorAllocationManager¶
_executorAllocationManager: Option[ExecutorAllocationManager]
SparkContext
initializes _executorAllocationManager
internal registry.
SparkContext
creates an ExecutorAllocationManager when:
-
Dynamic Allocation of Executors is enabled (based on spark.dynamicAllocation.enabled configuration property and the master URL)
The ExecutorAllocationManager
is requested to start.
Registering User-Defined SparkListeners¶
SparkContext
registers user-defined listeners and starts SparkListenerEvent
event delivery to the listeners.
postEnvironmentUpdate¶
postEnvironmentUpdate
is called that posts SparkListener.md#SparkListenerEnvironmentUpdate[SparkListenerEnvironmentUpdate] message on scheduler:LiveListenerBus.md[] with information about Task Scheduler's scheduling mode, added jar and file paths, and other environmental details.
postApplicationStart¶
SparkListener.md#SparkListenerApplicationStart[SparkListenerApplicationStart] message is posted to scheduler:LiveListenerBus.md[] (using the internal postApplicationStart
method).
postStartHook¶
TaskScheduler
scheduler:TaskScheduler.md#postStartHook[is notified that SparkContext
is almost fully initialized].
NOTE: scheduler:TaskScheduler.md#postStartHook[TaskScheduler.postStartHook] does nothing by default, but custom implementations offer more advanced features, i.e. TaskSchedulerImpl
scheduler:TaskSchedulerImpl.md#postStartHook[blocks the current thread until SchedulerBackend
is ready]. There is also YarnClusterScheduler
for Spark on YARN in cluster
deploy mode.
Registering Metrics Sources¶
SparkContext
requests MetricsSystem
to register metrics sources for the following services:
Adding Shutdown Hook¶
SparkContext
adds a shutdown hook (using ShutdownHookManager.addShutdownHook()
).
SparkContext
prints out the following DEBUG message to the logs:
Adding shutdown hook
CAUTION: FIXME ShutdownHookManager.addShutdownHook()
Any non-fatal Exception leads to termination of the Spark context instance.
CAUTION: FIXME What does NonFatal
represent in Scala?
CAUTION: FIXME Finish me
Initializing nextShuffleId and nextRddId Internal Counters¶
nextShuffleId
and nextRddId
start with 0
.
CAUTION: FIXME Where are nextShuffleId
and nextRddId
used?
A new instance of Spark context is created and ready for operation.
Loading External Cluster Manager for URL (getClusterManager method)¶
getClusterManager(
url: String): Option[ExternalClusterManager]
getClusterManager
loads scheduler:ExternalClusterManager.md[] that scheduler:ExternalClusterManager.md#canCreate[can handle the input url
].
If there are two or more external cluster managers that could handle url
, a SparkException
is thrown:
Multiple Cluster Managers ([serviceLoaders]) registered for the url [url].
NOTE: getClusterManager
uses Java's ++https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html#load-java.lang.Class-java.lang.ClassLoader-++[ServiceLoader.load] method.
NOTE: getClusterManager
is used to find a cluster manager for a master URL when SparkContext.md#createTaskScheduler[creating a SchedulerBackend
and a TaskScheduler
for the driver].
setupAndStartListenerBus¶
setupAndStartListenerBus(): Unit
setupAndStartListenerBus
is an internal method that reads configuration-properties.md#spark.extraListeners[spark.extraListeners] configuration property from the current SparkConf.md[SparkConf] to create and register SparkListenerInterface listeners.
It expects that the class name represents a SparkListenerInterface
listener with one of the following constructors (in this order):
- a single-argument constructor that accepts SparkConf.md[SparkConf]
- a zero-argument constructor
setupAndStartListenerBus
scheduler:LiveListenerBus.md#ListenerBus-addListener[registers every listener class].
You should see the following INFO message in the logs:
INFO Registered listener [className]
It scheduler:LiveListenerBus.md#start[starts LiveListenerBus] and records it in the internal _listenerBusStarted
.
When no single-SparkConf
or zero-argument constructor could be found for a class name in configuration-properties.md#spark.extraListeners[spark.extraListeners] configuration property, a SparkException
is thrown with the message:
[className] did not have a zero-argument constructor or a single-argument constructor that accepts SparkConf. Note: if the class is defined inside of another Scala class, then its constructors may accept an implicit parameter that references the enclosing class; in this case, you must define the listener as a top-level class in order to prevent this extra parameter from breaking Spark's ability to find a valid constructor.
Any exception while registering a SparkListenerInterface listener stops the SparkContext and a SparkException
is thrown and the source exception's message.
Exception when registering SparkListener
Tip
Set INFO
logging level for org.apache.spark.SparkContext
logger to see the extra listeners being registered.
Registered listener pl.japila.spark.CustomSparkListener