SparkSession — The Entry Point to Spark SQL¶
SparkSession is the entry point to Spark SQL. It is one of the first objects created in a Spark SQL application.
SparkSession is created using the SparkSession.builder method.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("My Spark Application") // optional and will be autogenerated if not specified
.master("local[*]") // only for demo and testing purposes, use spark-submit instead
.enableHiveSupport() // self-explanatory, isn't it?
.config("spark.sql.warehouse.dir", "target/spark-warehouse")
.withExtensions { extensions =>
extensions.injectResolutionRule { session =>
...
}
extensions.injectOptimizerRule { session =>
...
}
}
.getOrCreate
SparkSession is a namespace of relational entities (e.g. databases, tables). A Spark SQL application could use many SparkSessions to keep the relational entities separate logically in metadata catalogs.
SparkSession in spark-shell
spark object in spark-shell (the instance of SparkSession that is auto-created) has Hive support enabled.
In order to disable the pre-configured Hive support in the spark object, use spark.sql.catalogImplementation internal configuration property with in-memory value (that uses InMemoryCatalog external catalog instead).
$ spark-shell --conf spark.sql.catalogImplementation=in-memory
Creating Instance¶
SparkSession takes the following to be created:
-
SparkContext - Parent SessionState (if given)
- SparkSessionExtensions
SparkSession is created when:
SparkSession.Builderis requested to getOrCreate- Indirectly using newSession or cloneSession
StreamingQueryManager¶
streams: StreamingQueryManager
streams requests this SessionState for the StreamingQueryManager
SessionState¶
sessionState: SessionState
sessionState is the current SessionState.
Internally, sessionState <
- in-memory (default) for SessionStateBuilder.md[org.apache.spark.sql.internal.SessionStateBuilder]
- hive for hive/HiveSessionStateBuilder.md[org.apache.spark.sql.hive.HiveSessionStateBuilder]
Creating New SparkSession¶
newSession(): SparkSession
newSession creates a new SparkSession with an undefined parent SessionState and (re)using the following:
SparkSession.newSession and SparkSession.cloneSession
SparkSession.newSession uses no parent SessionState while SparkSession.cloneSession (re)uses SessionState.
Cloning SparkSession¶
cloneSession(): SparkSession
cloneSession...FIXME
cloneSession is used when:
AdaptiveSparkPlanHelperis requested togetOrCloneSessionWithAqeOffStreamExecution(Spark Structured Streaming) is created
Creating SparkSession Using Builder Pattern¶
builder(): Builder
builder is an object method that creates a new Builder (that is then used to build a SparkSession using a so-called Fluent API).
import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
Fluent interface
Read about Fluent interface design pattern in Wikipedia, the free encyclopedia.
Spark Version¶
version: String
version returns the version of Apache Spark in use.
Internally, version uses spark.SPARK_VERSION value that is the version property in spark-version-info.properties properties file on CLASSPATH.
Creating Empty Dataset (Given Encoder)¶
emptyDataset[T: Encoder]: Dataset[T]
emptyDataset creates an empty Dataset (assuming that future records being of type T).
scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]
scala> strings.printSchema
root
|-- value: string (nullable = true)
emptyDataset creates a LocalRelation logical operator.
Creating Dataset from Local Collections or RDDs¶
createDataset[T : Encoder](
data: RDD[T]): Dataset[T]
createDataset[T : Encoder](
data: Seq[T]): Dataset[T]
createDataset creates a Dataset from a local Scala collection, i.e. Seq[T], Java's List[T], or a distributed RDD[T].
scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> one.show
+-----+
|value|
+-----+
| 1|
+-----+
createDataset creates logical operators:
- LocalRelation for the input
datacollection - LogicalRDD for the input
RDD[T]
implicits object
You may want to consider implicits object and toDS method instead.
val spark: SparkSession = ...
import spark.implicits._
scala> val one = Seq(1).toDS
one: org.apache.spark.sql.Dataset[Int] = [value: int]
Internally, createDataset first looks up the implicit ExpressionEncoder in scope to access the AttributeReferences (of the schema).
The expression encoder is then used to map elements (of the input Seq[T]) into a collection of InternalRows. With the references and rows, createDataset returns a Dataset.md[Dataset] with a LocalRelation.md[LocalRelation logical query plan].
Executing SQL Queries (SQL Mode)¶
sql(
sqlText: String): DataFrame
sql(
sqlText: String,
args: Map[String, Any]): DataFrame
sql creates a QueryPlanningTracker to measure executing the following in parsing phase:
sqlrequests the SessionState for the ParserInterface to parse the givensqlTextSQL statement (that gives a LogicalPlan)
In the end, sql creates a DataFrame with the following:
- This
SparkSession - The
LogicalPlan - The
QueryPlanningTracker
sql Private Helper¶
sql(
sqlText: String,
args: Map[String, Any],
tracker: QueryPlanningTracker): DataFrame
sql requests the given QueryPlanningTracker to measure parsing phase.
While being measured, sql requests the SessionState for the sqlParser to parse the given sqlText.
With non-empty args, sql creates a NameParameterizedQuery with the parsed logical plan and the args. sql converts the values to literals.
In the end, sql creates a DataFrame for the plan produced (and the QueryPlanningTracker).
Accessing UDFRegistration¶
udf: UDFRegistration
udf attribute is UDFRegistration (for registering user-defined functions for SQL-based queries).
val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)
val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")
scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
| a| A|
| b| B|
| c| C|
+-----+-----+
Internally, it is simply an alias for SessionState.udfRegistration.
Loading Data From Table¶
table(
multipartIdentifier: Seq[String]): DataFrame
table(
tableName: String): DataFrame
table(
tableIdent: TableIdentifier): DataFrame
table creates a DataFrame for the input tableName table.
Note
baseRelationToDataFrame acts as a mechanism to plug BaseRelation object hierarchy in into adocLogicalPlan object hierarchy that SparkSession uses to bridge them.
scala> spark.catalog.tableExists("t1")
res1: Boolean = true
// t1 exists in the catalog
// let's load it
val t1 = spark.table("t1")
Catalog¶
catalog: Catalog
catalog creates a CatalogImpl when first accessed.
lazy value
catalog is a Scala lazy value which is computed once when accessed and cached afterwards.
DataFrameReader¶
read: DataFrameReader
read gives DataFrameReader to load data from external data sources and load it into a DataFrame.
val spark: SparkSession = ... // create instance
val dfReader: DataFrameReader = spark.read
Runtime Configuration¶
conf: RuntimeConfig
conf returns the current RuntimeConfig.
Internally, conf creates a RuntimeConfig (when requested the very first time and cached afterwards) with the SQLConf (of the SessionState).
ExperimentalMethods¶
experimental: ExperimentalMethods
experimentalMethods is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]s.
experimental is used in SparkPlanner and SparkOptimizer.
Create DataFrame for BaseRelation¶
baseRelationToDataFrame(
baseRelation: BaseRelation): DataFrame
Internally, baseRelationToDataFrame creates a DataFrame from the input BaseRelation wrapped inside LogicalRelation.
baseRelationToDataFrame is used when:
DataFrameReaderis requested to load data from data source or JDBC tableTextInputCSVDataSourcecreates a baseDataset(of Strings)TextInputJsonDataSourcecreates a baseDataset(of Strings)
Creating SessionState¶
instantiateSessionState(
className: String,
sparkSession: SparkSession): SessionState
instantiateSessionState finds the className that is then used to create and build a BaseSessionStateBuilder.
instantiateSessionState may report an IllegalArgumentException while instantiating the class of a SessionState:
Error while instantiating '[className]'
instantiateSessionState is used when SparkSession is requested for SessionState (based on spark.sql.catalogImplementation configuration property).
sessionStateClassName¶
sessionStateClassName(
conf: SparkConf): String
sessionStateClassName gives the name of the class of the SessionState per spark.sql.catalogImplementation, i.e.
- org.apache.spark.sql.hive.HiveSessionStateBuilder for
hive - org.apache.spark.sql.internal.SessionStateBuilder for
in-memory
sessionStateClassName is used when SparkSession is requested for the SessionState (and one is not available yet).
Creating DataFrame From RDD Of Internal Binary Rows and Schema¶
internalCreateDataFrame(
catalystRows: RDD[InternalRow],
schema: StructType,
isStreaming: Boolean = false): DataFrame
internalCreateDataFrame creates a DataFrame with LogicalRDD.
internalCreateDataFrame is used when:
-
DataFrameReaderis requested to create a DataFrame from Dataset of JSONs or CSVs -
SparkSessionis requested to create a DataFrame from RDD of rows -
InsertIntoDataSourceCommand logical command is executed
ExecutionListenerManager¶
listenerManager: ExecutionListenerManager
SharedState¶
sharedState: SharedState
Measuring Duration of Executing Code Block¶
time[T](f: => T): T
time executes a code block and prints out (to standard output) the time taken to execute it
Applying SparkSessionExtensions¶
applyExtensions(
extensionConfClassNames: Seq[String],
extensions: SparkSessionExtensions): SparkSessionExtensions
applyExtensions uses the given extensionConfClassNames as the names of the extension classes.
For every extension class, applyExtensions instantiates one by one passing in the given SparkSessionExtensions.
Note
The given SparkSessionExtensions is mutated in-place.
In the end, applyExtensions returns the given SparkSessionExtensions.
In case of ClassCastException, ClassNotFoundException or NoClassDefFoundError, applyExtensions prints out the following WARN message to the logs:
Cannot use [extensionConfClassName] to configure session extensions.
applyExtensions is used when:
SparkSession.Builderis requested to get active or create a new SparkSession instanceSparkSessionis created (from aSparkContext)
Default Parallelism of Leaf Nodes¶
leafNodeDefaultParallelism: Int
leafNodeDefaultParallelism is the value of spark.sql.leafNodeDefaultParallelism if defined or SparkContext.defaultParallelism (Spark Core).
leafNodeDefaultParallelism is used when:
- SparkSession.range operator is used
RangeExecleaf physical operator is createdCommandResultExecphysical operator is requested for theRDD[InternalRow]LocalTableScanExecphysical operator is requested for the RDDFilePartitionis requested for maxSplitBytes