Inside Creating SparkContext¶
This document describes the internals of what happens when a new SparkContext
is created.
import org.apache.spark.{SparkConf, SparkContext}
// 1. Create Spark configuration
val conf = new SparkConf()
.setAppName("SparkMe Application")
.setMaster("local[*]")
// 2. Create Spark context
val sc = new SparkContext(conf)
creationSite¶
creationSite: CallSite
SparkContext
determines call site.
assertOnDriver¶
SparkContext
...FIXME
markPartiallyConstructed¶
SparkContext
...FIXME
startTime¶
startTime: Long
SparkContext
records the current time (in ms).
stopped¶
stopped: AtomicBoolean
SparkContext
initializes stopped
flag to false
.
Printing Out Spark Version¶
SparkContext
prints out the following INFO message to the logs:
Running Spark version [SPARK_VERSION]
sparkUser¶
sparkUser: String
SparkContext
determines Spark user.
SparkConf¶
_conf: SparkConf
SparkContext
clones the SparkConf and requests it to validateSettings.
Enforcing Mandatory Configuration Properties¶
SparkContext
asserts that spark.master and spark.app.name are defined (in the SparkConf).
A master URL must be set in your configuration
An application name must be set in your configuration
DriverLogger¶
_driverLogger: Option[DriverLogger]
SparkContext
creates a DriverLogger
.
ResourceInformation¶
_resources: Map[String, ResourceInformation]
SparkContext
uses spark.driver.resourcesFile configuration property to discovery driver resources and prints out the following INFO message to the logs:
==============================================================
Resources for [componentName]:
[resources]
==============================================================
Submitted Application¶
SparkContext
prints out the following INFO message to the logs (with the value of spark.app.name configuration property):
Submitted application: [appName]
Spark on YARN and spark.yarn.app.id¶
For Spark on YARN in cluster deploy mode], SparkContext
checks whether spark.yarn.app.id
configuration property is defined. SparkException
is thrown if it does not exist.
Detected yarn cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.
Displaying Spark Configuration¶
With spark.logConf configuration property enabled, SparkContext
prints out the following INFO message to the logs:
Spark configuration:
[conf.toDebugString]
Note
SparkConf.toDebugString
is used very early in the initialization process and other settings configured afterwards are not included. Use SparkContext.getConf.toDebugString
once SparkContext
is initialized.
Setting Configuration Properties¶
- spark.driver.host to the current value of the property (to override the default)
- spark.driver.port to
0
unless defined already - spark.executor.id to
driver
User-Defined Jar Files¶
_jars: Seq[String]
SparkContext
sets the _jars
to spark.jars configuration property.
User-Defined Files¶
_files: Seq[String]
SparkContext
sets the _files
to spark.files configuration property.
spark.eventLog.dir¶
_eventLogDir: Option[URI]
If spark-history-server:EventLoggingListener.md[event logging] is enabled, i.e. EventLoggingListener.md#spark_eventLog_enabled[spark.eventLog.enabled] flag is true
, the internal field _eventLogDir
is set to the value of EventLoggingListener.md#spark_eventLog_dir[spark.eventLog.dir] setting or the default value /tmp/spark-events
.
spark.eventLog.compress¶
_eventLogCodec: Option[String]
Also, if spark-history-server:EventLoggingListener.md#spark_eventLog_compress[spark.eventLog.compress] is enabled (it is not by default), the short name of the io:CompressionCodec.md[CompressionCodec] is assigned to _eventLogCodec
. The config key is core:BroadcastManager.md#spark_io_compression_codec[spark.io.compression.codec] (default: lz4
).
Creating LiveListenerBus¶
_listenerBus: LiveListenerBus
SparkContext
creates a LiveListenerBus.
Creating AppStatusStore¶
_statusStore: AppStatusStore
SparkContext
requests AppStatusStore
to create a core:AppStatusStore.md#createLiveStore[live store] (i.e. the AppStatusStore
for a live Spark application) and requests <
NOTE: The current AppStatusStore
is available as SparkContext.md#statusStore[statusStore] property of the SparkContext
.
Creating SparkEnv¶
_env: SparkEnv
SparkContext
creates a SparkEnv and requests SparkEnv
to use the instance as the default SparkEnv.
spark.repl.class.uri¶
With spark.repl.class.outputDir configuration property defined, SparkContext
sets spark.repl.class.uri configuration property to be...FIXME
Creating SparkStatusTracker¶
_statusTracker: SparkStatusTracker
SparkContext
creates a SparkStatusTracker (with itself and the AppStatusStore).
Creating ConsoleProgressBar¶
_progressBar: Option[ConsoleProgressBar]
SparkContext
creates a ConsoleProgressBar only when spark.ui.showConsoleProgress configuration property is enabled.
Creating SparkUI¶
_ui: Option[SparkUI]
SparkContext
creates a SparkUI only when spark.ui.enabled configuration property is enabled.
SparkContext
requests the SparkUI
to bind.
Hadoop Configuration¶
_hadoopConfiguration: Configuration
SparkContext
creates a new Hadoop Configuration
.
Adding User-Defined Jar Files¶
If there are jars given through the SparkContext
constructor, they are added using addJar
.
Adding User-Defined Files¶
SparkContext
adds the files in spark.files configuration property.
_executorMemory¶
_executorMemory: Int
SparkContext
determines the amount of memory to allocate to each executor. It is the value of executor:Executor.md#spark.executor.memory[spark.executor.memory] setting, or SparkContext.md#environment-variables[SPARK_EXECUTOR_MEMORY] environment variable (or currently-deprecated SPARK_MEM
), or defaults to 1024
.
_executorMemory
is later available as sc.executorMemory
and used for LOCAL_CLUSTER_REGEX, spark-standalone.md#SparkDeploySchedulerBackend[Spark Standalone's SparkDeploySchedulerBackend], to set executorEnvs("SPARK_EXECUTOR_MEMORY")
, MesosSchedulerBackend, CoarseMesosSchedulerBackend.
SPARK_PREPEND_CLASSES Environment Variable¶
The value of SPARK_PREPEND_CLASSES
environment variable is included in executorEnvs
.
For Mesos SchedulerBackend Only¶
The Mesos scheduler backend's configuration is included in executorEnvs
, i.e. SparkContext.md#environment-variables[SPARK_EXECUTOR_MEMORY], _conf.getExecutorEnv
, and SPARK_USER
.
ShuffleDriverComponents¶
_shuffleDriverComponents: ShuffleDriverComponents
SparkContext
...FIXME
Registering HeartbeatReceiver¶
SparkContext
registers HeartbeatReceiver RPC endpoint.
PluginContainer¶
_plugins: Option[PluginContainer]
SparkContext
creates a PluginContainer (with itself and the _resources).
Creating SchedulerBackend and TaskScheduler¶
SparkContext
object is requested to SparkContext.md#createTaskScheduler[create the SchedulerBackend with the TaskScheduler] (for the given master URL) and the result becomes the internal _schedulerBackend
and _taskScheduler
.
scheduler:DAGScheduler.md#creating-instance[DAGScheduler is created] (as _dagScheduler
).
Sending Blocking TaskSchedulerIsSet¶
SparkContext
sends a blocking TaskSchedulerIsSet
message to HeartbeatReceiver RPC endpoint (to inform that the TaskScheduler
is now available).
Heartbeater¶
_heartbeater: Heartbeater
SparkContext
creates a Heartbeater
and starts it.
Starting TaskScheduler¶
SparkContext
requests the TaskScheduler to start.
Setting Spark Application's and Execution Attempt's IDs¶
SparkContext
sets the internal fields -- _applicationId
and _applicationAttemptId
-- (using applicationId
and applicationAttemptId
methods from the scheduler:TaskScheduler.md#contract[TaskScheduler Contract]).
NOTE: SparkContext
requests TaskScheduler
for the scheduler:TaskScheduler.md#applicationId[unique identifier of a Spark application] (that is currently only implemented by scheduler:TaskSchedulerImpl.md#applicationId[TaskSchedulerImpl] that uses SchedulerBackend
to scheduler:SchedulerBackend.md#applicationId[request the identifier]).
NOTE: The unique identifier of a Spark application is used to initialize spark-webui-SparkUI.md#setAppId[SparkUI] and storage:BlockManager.md#initialize[BlockManager].
NOTE: _applicationAttemptId
is used when SparkContext
is requested for the SparkContext.md#applicationAttemptId[unique identifier of execution attempt of a Spark application] and when EventLoggingListener
spark-history-server:EventLoggingListener.md#creating-instance[is created].
Setting spark.app.id Spark Property in SparkConf¶
SparkContext
sets SparkConf.md#spark.app.id[spark.app.id] property to be the <<_applicationId, unique identifier of a Spark application>> and, if enabled, spark-webui-SparkUI.md#setAppId[passes it on to SparkUI
].
spark.ui.proxyBase¶
Initializing SparkUI¶
SparkContext
requests the SparkUI (if defined) to setAppId with the _applicationId.
Initializing BlockManager¶
The storage:BlockManager.md#initialize[BlockManager (for the driver) is initialized] (with _applicationId
).
Starting MetricsSystem¶
SparkContext
requests the MetricsSystem
to start.
NOTE: SparkContext
starts MetricsSystem
after <MetricsSystem
uses it to build unique identifiers fo metrics sources.
Attaching JSON Servlet Handler¶
SparkContext
requests the MetricsSystem
for a JSON servlet handler and requests the <<_ui, SparkUI>> to spark-webui-WebUI.md#attachHandler[attach it].
Starting EventLoggingListener (with Event Log Enabled)¶
_eventLogger: Option[EventLoggingListener]
With spark.eventLog.enabled configuration property enabled, SparkContext
creates an EventLoggingListener and requests it to start.
SparkContext
requests the LiveListenerBus to add the EventLoggingListener
to eventLog
event queue.
With spark.eventLog.enabled
disabled, _eventLogger
is None
(undefined).
ContextCleaner¶
_cleaner: Option[ContextCleaner]
With spark.cleaner.referenceTracking configuration property enabled, SparkContext
creates a ContextCleaner (with itself and the _shuffleDriverComponents).
SparkContext
requests the ContextCleaner
to start
ExecutorAllocationManager¶
_executorAllocationManager: Option[ExecutorAllocationManager]
SparkContext
initializes _executorAllocationManager
internal registry.
SparkContext
creates an ExecutorAllocationManager when:
-
Dynamic Allocation of Executors is enabled (based on spark.dynamicAllocation.enabled configuration property and the master URL)
The ExecutorAllocationManager
is requested to start.
Registering User-Defined SparkListeners¶
SparkContext
registers user-defined listeners and starts SparkListenerEvent
event delivery to the listeners.
postEnvironmentUpdate¶
postEnvironmentUpdate
is called that posts SparkListener.md#SparkListenerEnvironmentUpdate[SparkListenerEnvironmentUpdate] message on scheduler:LiveListenerBus.md[] with information about Task Scheduler's scheduling mode, added jar and file paths, and other environmental details. They are displayed in web UI's spark-webui-environment.md[Environment tab].
postApplicationStart¶
SparkListener.md#SparkListenerApplicationStart[SparkListenerApplicationStart] message is posted to scheduler:LiveListenerBus.md[] (using the internal postApplicationStart
method).
postStartHook¶
TaskScheduler
scheduler:TaskScheduler.md#postStartHook[is notified that SparkContext
is almost fully initialized].
NOTE: scheduler:TaskScheduler.md#postStartHook[TaskScheduler.postStartHook] does nothing by default, but custom implementations offer more advanced features, i.e. TaskSchedulerImpl
scheduler:TaskSchedulerImpl.md#postStartHook[blocks the current thread until SchedulerBackend
is ready]. There is also YarnClusterScheduler
for Spark on YARN in cluster
deploy mode.
Registering Metrics Sources¶
SparkContext
requests MetricsSystem
to register metrics sources for the following services:
Adding Shutdown Hook¶
SparkContext
adds a shutdown hook (using ShutdownHookManager.addShutdownHook()
).
SparkContext
prints out the following DEBUG message to the logs:
Adding shutdown hook
CAUTION: FIXME ShutdownHookManager.addShutdownHook()
Any non-fatal Exception leads to termination of the Spark context instance.
CAUTION: FIXME What does NonFatal
represent in Scala?
CAUTION: FIXME Finish me
Initializing nextShuffleId and nextRddId Internal Counters¶
nextShuffleId
and nextRddId
start with 0
.
CAUTION: FIXME Where are nextShuffleId
and nextRddId
used?
A new instance of Spark context is created and ready for operation.
Loading External Cluster Manager for URL (getClusterManager method)¶
getClusterManager(
url: String): Option[ExternalClusterManager]
getClusterManager
loads scheduler:ExternalClusterManager.md[] that scheduler:ExternalClusterManager.md#canCreate[can handle the input url
].
If there are two or more external cluster managers that could handle url
, a SparkException
is thrown:
Multiple Cluster Managers ([serviceLoaders]) registered for the url [url].
NOTE: getClusterManager
uses Java's ++https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html#load-java.lang.Class-java.lang.ClassLoader-++[ServiceLoader.load] method.
NOTE: getClusterManager
is used to find a cluster manager for a master URL when SparkContext.md#createTaskScheduler[creating a SchedulerBackend
and a TaskScheduler
for the driver].
setupAndStartListenerBus¶
setupAndStartListenerBus(): Unit
setupAndStartListenerBus
is an internal method that reads configuration-properties.md#spark.extraListeners[spark.extraListeners] configuration property from the current SparkConf.md[SparkConf] to create and register SparkListenerInterface listeners.
It expects that the class name represents a SparkListenerInterface
listener with one of the following constructors (in this order):
- a single-argument constructor that accepts SparkConf.md[SparkConf]
- a zero-argument constructor
setupAndStartListenerBus
scheduler:LiveListenerBus.md#ListenerBus-addListener[registers every listener class].
You should see the following INFO message in the logs:
INFO Registered listener [className]
It scheduler:LiveListenerBus.md#start[starts LiveListenerBus] and records it in the internal _listenerBusStarted
.
When no single-SparkConf
or zero-argument constructor could be found for a class name in configuration-properties.md#spark.extraListeners[spark.extraListeners] configuration property, a SparkException
is thrown with the message:
[className] did not have a zero-argument constructor or a single-argument constructor that accepts SparkConf. Note: if the class is defined inside of another Scala class, then its constructors may accept an implicit parameter that references the enclosing class; in this case, you must define the listener as a top-level class in order to prevent this extra parameter from breaking Spark's ability to find a valid constructor.
Any exception while registering a SparkListenerInterface listener stops the SparkContext and a SparkException
is thrown and the source exception's message.
Exception when registering SparkListener
Tip
Set INFO
logging level for org.apache.spark.SparkContext
logger to see the extra listeners being registered.
Registered listener pl.japila.spark.CustomSparkListener