Skip to content

SparkSession — The Entry Point to Spark SQL

SparkSession is the entry point to Spark SQL. It is one of the first objects created in a Spark SQL application.

SparkSession is created using the SparkSession.builder method.

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
  .appName("My Spark Application")  // optional and will be autogenerated if not specified
  .master("local[*]")               // only for demo and testing purposes, use spark-submit instead
  .enableHiveSupport()              // self-explanatory, isn't it?
  .config("spark.sql.warehouse.dir", "target/spark-warehouse")
  .withExtensions { extensions =>
    extensions.injectResolutionRule { session =>
      ...
    }
    extensions.injectOptimizerRule { session =>
      ...
    }
  }
  .getOrCreate

SparkSession is a namespace of relational entities (e.g. databases, tables). A Spark SQL application could use many SparkSessions to keep the relational entities separate logically in metadata catalogs.

SparkSession in spark-shell

spark object in spark-shell (the instance of SparkSession that is auto-created) has Hive support enabled.

In order to disable the pre-configured Hive support in the spark object, use spark.sql.catalogImplementation internal configuration property with in-memory value (that uses InMemoryCatalog external catalog instead).

$ spark-shell --conf spark.sql.catalogImplementation=in-memory

Creating Instance

SparkSession takes the following to be created:

SparkSession is created when:

SessionState

sessionState: SessionState

sessionState is the current SessionState.

Internally, sessionState <> the optional <> (if given when <>) or <> using <> as defined by <> configuration property:

  • in-memory (default) for SessionStateBuilder.md[org.apache.spark.sql.internal.SessionStateBuilder]
  • hive for hive/HiveSessionStateBuilder.md[org.apache.spark.sql.hive.HiveSessionStateBuilder]

Creating New SparkSession

newSession(): SparkSession

newSession creates a new SparkSession with an undefined parent SessionState and (re)using the following:

SparkSession.newSession and SparkSession.cloneSession

SparkSession.newSession uses no parent SessionState while SparkSession.cloneSession (re)uses SessionState.

Cloning SparkSession

cloneSession(): SparkSession

cloneSession...FIXME

cloneSession is used when:

  • AdaptiveSparkPlanHelper is requested to getOrCloneSessionWithAqeOff
  • StreamExecution (Spark Structured Streaming) is created

Creating SparkSession Using Builder Pattern

builder(): Builder

builder is an object method that creates a new Builder (that is then used to build a SparkSession using a so-called Fluent API).

import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
Fluent interface

Read about Fluent interface design pattern in Wikipedia, the free encyclopedia.

Spark Version

version: String

version returns the version of Apache Spark in use.

Internally, version uses spark.SPARK_VERSION value that is the version property in spark-version-info.properties properties file on CLASSPATH.

Creating Empty Dataset (Given Encoder)

emptyDataset[T: Encoder]: Dataset[T]

emptyDataset creates an empty Dataset (assuming that future records being of type T).

scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]

scala> strings.printSchema
root
 |-- value: string (nullable = true)

emptyDataset creates a LocalRelation logical operator.

Creating Dataset from Local Collections or RDDs

createDataset[T : Encoder](
  data: RDD[T]): Dataset[T]
createDataset[T : Encoder](
  data: Seq[T]): Dataset[T]

createDataset creates a Dataset from a local Scala collection, i.e. Seq[T], Java's List[T], or a distributed RDD[T].

scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> one.show
+-----+
|value|
+-----+
|    1|
+-----+

createDataset creates logical operators:

implicits object

You may want to consider implicits object and toDS method instead.

val spark: SparkSession = ...
import spark.implicits._

scala> val one = Seq(1).toDS
one: org.apache.spark.sql.Dataset[Int] = [value: int]

Internally, createDataset first looks up the implicit ExpressionEncoder in scope to access the AttributeReferences (of the schema).

The expression encoder is then used to map elements (of the input Seq[T]) into a collection of InternalRows. With the references and rows, createDataset returns a Dataset.md[Dataset] with a LocalRelation.md[LocalRelation logical query plan].

Executing SQL Queries (SQL Mode)

sql(
  sqlText: String): DataFrame
sql(
  sqlText: String,
  args: Map[String, Any]): DataFrame

sql creates a QueryPlanningTracker to measure executing the following in parsing phase:

In the end, sql creates a DataFrame with the following:

  • This SparkSession
  • The LogicalPlan
  • The QueryPlanningTracker

sql Private Helper

sql(
  sqlText: String,
  args: Map[String, Any],
  tracker: QueryPlanningTracker): DataFrame

sql requests the given QueryPlanningTracker to measure parsing phase.

While being measured, sql requests the SessionState for the sqlParser to parse the given sqlText.

With non-empty args, sql creates a NameParameterizedQuery with the parsed logical plan and the args. sql converts the values to literals.

In the end, sql creates a DataFrame for the plan produced (and the QueryPlanningTracker).

Accessing UDFRegistration

udf: UDFRegistration

udf attribute is UDFRegistration (for registering user-defined functions for SQL-based queries).

val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)

val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")

scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
|    a|    A|
|    b|    B|
|    c|    C|
+-----+-----+

Internally, it is simply an alias for SessionState.udfRegistration.

Loading Data From Table

table(
  multipartIdentifier: Seq[String]): DataFrame
table(
  tableName: String): DataFrame
table(
  tableIdent: TableIdentifier): DataFrame

table creates a DataFrame for the input tableName table.

Note

baseRelationToDataFrame acts as a mechanism to plug BaseRelation object hierarchy in into adocLogicalPlan object hierarchy that SparkSession uses to bridge them.

scala> spark.catalog.tableExists("t1")
res1: Boolean = true

// t1 exists in the catalog
// let's load it
val t1 = spark.table("t1")

Catalog

catalog: Catalog

catalog creates a CatalogImpl when first accessed.

lazy value

catalog is a Scala lazy value which is computed once when accessed and cached afterwards.

DataFrameReader

read: DataFrameReader

read gives DataFrameReader to load data from external data sources and load it into a DataFrame.

val spark: SparkSession = ... // create instance
val dfReader: DataFrameReader = spark.read

Runtime Configuration

conf: RuntimeConfig

conf returns the current RuntimeConfig.

Internally, conf creates a RuntimeConfig (when requested the very first time and cached afterwards) with the SQLConf (of the SessionState).

ExperimentalMethods

experimental: ExperimentalMethods

experimentalMethods is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]s.

experimental is used in SparkPlanner and SparkOptimizer.

Create DataFrame for BaseRelation

baseRelationToDataFrame(
  baseRelation: BaseRelation): DataFrame

Internally, baseRelationToDataFrame creates a DataFrame from the input BaseRelation wrapped inside LogicalRelation.

baseRelationToDataFrame is used when:

  • DataFrameReader is requested to load data from data source or JDBC table
  • TextInputCSVDataSource creates a base Dataset (of Strings)
  • TextInputJsonDataSource creates a base Dataset (of Strings)

Creating SessionState

instantiateSessionState(
  className: String,
  sparkSession: SparkSession): SessionState

instantiateSessionState finds the className that is then used to create and build a BaseSessionStateBuilder.

instantiateSessionState may report an IllegalArgumentException while instantiating the class of a SessionState:

Error while instantiating '[className]'

instantiateSessionState is used when SparkSession is requested for SessionState (based on spark.sql.catalogImplementation configuration property).

sessionStateClassName

sessionStateClassName(
  conf: SparkConf): String

sessionStateClassName gives the name of the class of the SessionState per spark.sql.catalogImplementation, i.e.

sessionStateClassName is used when SparkSession is requested for the SessionState (and one is not available yet).

Creating DataFrame From RDD Of Internal Binary Rows and Schema

internalCreateDataFrame(
  catalystRows: RDD[InternalRow],
  schema: StructType,
  isStreaming: Boolean = false): DataFrame

internalCreateDataFrame creates a DataFrame with LogicalRDD.

internalCreateDataFrame is used when:

ExecutionListenerManager

listenerManager: ExecutionListenerManager

ExecutionListenerManager

SharedState

sharedState: SharedState

SharedState

Measuring Duration of Executing Code Block

time[T](f: => T): T

time executes a code block and prints out (to standard output) the time taken to execute it

Applying SparkSessionExtensions

applyExtensions(
  extensionConfClassNames: Seq[String],
  extensions: SparkSessionExtensions): SparkSessionExtensions

applyExtensions uses the given extensionConfClassNames as the names of the extension classes.

For every extension class, applyExtensions instantiates one by one passing in the given SparkSessionExtensions.

Note

The given SparkSessionExtensions is mutated in-place.

In the end, applyExtensions returns the given SparkSessionExtensions.


In case of ClassCastException, ClassNotFoundException or NoClassDefFoundError, applyExtensions prints out the following WARN message to the logs:

Cannot use [extensionConfClassName] to configure session extensions.

applyExtensions is used when:

Default Parallelism of Leaf Nodes

leafNodeDefaultParallelism: Int

leafNodeDefaultParallelism is the value of spark.sql.leafNodeDefaultParallelism if defined or SparkContext.defaultParallelism (Spark Core).


leafNodeDefaultParallelism is used when:

  • SparkSession.range operator is used
  • RangeExec leaf physical operator is created
  • CommandResultExec physical operator is requested for the RDD[InternalRow]
  • LocalTableScanExec physical operator is requested for the RDD
  • FilePartition is requested for maxSplitBytes