SparkSession — The Entry Point to Spark SQL¶
SparkSession
is the entry point to Spark SQL. It is one of the first objects created in a Spark SQL application.
SparkSession
is created using the SparkSession.builder method.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("My Spark Application") // optional and will be autogenerated if not specified
.master("local[*]") // only for demo and testing purposes, use spark-submit instead
.enableHiveSupport() // self-explanatory, isn't it?
.config("spark.sql.warehouse.dir", "target/spark-warehouse")
.withExtensions { extensions =>
extensions.injectResolutionRule { session =>
...
}
extensions.injectOptimizerRule { session =>
...
}
}
.getOrCreate
SparkSession
is a namespace of relational entities (e.g. databases, tables). A Spark SQL application could use many SparkSessions
to keep the relational entities separate logically in metadata catalogs.
SparkSession in spark-shell
spark
object in spark-shell
(the instance of SparkSession
that is auto-created) has Hive support enabled.
In order to disable the pre-configured Hive support in the spark
object, use spark.sql.catalogImplementation internal configuration property with in-memory
value (that uses InMemoryCatalog external catalog instead).
$ spark-shell --conf spark.sql.catalogImplementation=in-memory
Creating Instance¶
SparkSession
takes the following to be created:
-
SparkContext
- Parent SessionState (if given)
- SparkSessionExtensions
SparkSession
is created when:
SparkSession.Builder
is requested to getOrCreate- Indirectly using newSession or cloneSession
SessionState¶
sessionState: SessionState
sessionState
is the current SessionState.
Internally, sessionState
<
- in-memory (default) for SessionStateBuilder.md[org.apache.spark.sql.internal.SessionStateBuilder]
- hive for hive/HiveSessionStateBuilder.md[org.apache.spark.sql.hive.HiveSessionStateBuilder]
Creating New SparkSession¶
newSession(): SparkSession
newSession
creates a new SparkSession
with an undefined parent SessionState and (re)using the following:
SparkSession.newSession and SparkSession.cloneSession
SparkSession.newSession
uses no parent SessionState while SparkSession.cloneSession (re)uses SessionState.
Cloning SparkSession¶
cloneSession(): SparkSession
cloneSession
...FIXME
cloneSession
is used when:
AdaptiveSparkPlanHelper
is requested togetOrCloneSessionWithAqeOff
StreamExecution
(Spark Structured Streaming) is created
Creating SparkSession Using Builder Pattern¶
builder(): Builder
builder
is an object method that creates a new Builder (that is then used to build a SparkSession
using a so-called Fluent API).
import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
Fluent interface
Read about Fluent interface design pattern in Wikipedia, the free encyclopedia.
Spark Version¶
version: String
version
returns the version of Apache Spark in use.
Internally, version
uses spark.SPARK_VERSION
value that is the version
property in spark-version-info.properties
properties file on CLASSPATH.
Creating Empty Dataset (Given Encoder)¶
emptyDataset[T: Encoder]: Dataset[T]
emptyDataset
creates an empty Dataset (assuming that future records being of type T
).
scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]
scala> strings.printSchema
root
|-- value: string (nullable = true)
emptyDataset
creates a LocalRelation logical operator.
Creating Dataset from Local Collections or RDDs¶
createDataset[T : Encoder](
data: RDD[T]): Dataset[T]
createDataset[T : Encoder](
data: Seq[T]): Dataset[T]
createDataset
creates a Dataset from a local Scala collection, i.e. Seq[T]
, Java's List[T]
, or a distributed RDD[T]
.
scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> one.show
+-----+
|value|
+-----+
| 1|
+-----+
createDataset
creates logical operators:
- LocalRelation for the input
data
collection - LogicalRDD for the input
RDD[T]
implicits object
You may want to consider implicits object and toDS
method instead.
val spark: SparkSession = ...
import spark.implicits._
scala> val one = Seq(1).toDS
one: org.apache.spark.sql.Dataset[Int] = [value: int]
Internally, createDataset
first looks up the implicit ExpressionEncoder in scope to access the AttributeReference
s (of the schema).
The expression encoder is then used to map elements (of the input Seq[T]
) into a collection of InternalRows. With the references and rows, createDataset
returns a Dataset.md[Dataset] with a LocalRelation.md[LocalRelation
logical query plan].
Executing SQL Queries (SQL Mode)¶
sql(
sqlText: String): DataFrame
sql(
sqlText: String,
args: Map[String, Any]): DataFrame
sql
creates a QueryPlanningTracker to measure executing the following in parsing phase:
sql
requests the SessionState for the ParserInterface to parse the givensqlText
SQL statement (that gives a LogicalPlan)
In the end, sql
creates a DataFrame with the following:
- This
SparkSession
- The
LogicalPlan
- The
QueryPlanningTracker
sql Private Helper¶
sql(
sqlText: String,
args: Map[String, Any],
tracker: QueryPlanningTracker): DataFrame
sql
requests the given QueryPlanningTracker to measure parsing phase.
While being measured, sql
requests the SessionState for the sqlParser to parse the given sqlText
.
With non-empty args
, sql
creates a NameParameterizedQuery with the parsed logical plan and the args
. sql
converts the values to literals.
In the end, sql
creates a DataFrame for the plan produced (and the QueryPlanningTracker
).
Accessing UDFRegistration¶
udf: UDFRegistration
udf
attribute is UDFRegistration (for registering user-defined functions for SQL-based queries).
val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)
val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")
scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
| a| A|
| b| B|
| c| C|
+-----+-----+
Internally, it is simply an alias for SessionState.udfRegistration.
Loading Data From Table¶
table(
multipartIdentifier: Seq[String]): DataFrame
table(
tableName: String): DataFrame
table(
tableIdent: TableIdentifier): DataFrame
table
creates a DataFrame for the input tableName
table.
Note
baseRelationToDataFrame acts as a mechanism to plug BaseRelation
object hierarchy in into adocLogicalPlan object hierarchy that SparkSession
uses to bridge them.
scala> spark.catalog.tableExists("t1")
res1: Boolean = true
// t1 exists in the catalog
// let's load it
val t1 = spark.table("t1")
Catalog¶
catalog: Catalog
catalog
creates a CatalogImpl when first accessed.
lazy value
catalog
is a Scala lazy value which is computed once when accessed and cached afterwards.
DataFrameReader¶
read: DataFrameReader
read
gives DataFrameReader to load data from external data sources and load it into a DataFrame
.
val spark: SparkSession = ... // create instance
val dfReader: DataFrameReader = spark.read
Runtime Configuration¶
conf: RuntimeConfig
conf
returns the current RuntimeConfig.
Internally, conf
creates a RuntimeConfig (when requested the very first time and cached afterwards) with the SQLConf (of the SessionState).
ExperimentalMethods¶
experimental: ExperimentalMethods
experimentalMethods
is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]
s.
experimental
is used in SparkPlanner and SparkOptimizer.
Create DataFrame for BaseRelation¶
baseRelationToDataFrame(
baseRelation: BaseRelation): DataFrame
Internally, baseRelationToDataFrame
creates a DataFrame from the input BaseRelation wrapped inside LogicalRelation.
baseRelationToDataFrame
is used when:
DataFrameReader
is requested to load data from data source or JDBC tableTextInputCSVDataSource
creates a baseDataset
(of Strings)TextInputJsonDataSource
creates a baseDataset
(of Strings)
Creating SessionState¶
instantiateSessionState(
className: String,
sparkSession: SparkSession): SessionState
instantiateSessionState
finds the className
that is then used to create and build a BaseSessionStateBuilder
.
instantiateSessionState
may report an IllegalArgumentException
while instantiating the class of a SessionState
:
Error while instantiating '[className]'
instantiateSessionState
is used when SparkSession
is requested for SessionState (based on spark.sql.catalogImplementation configuration property).
sessionStateClassName¶
sessionStateClassName(
conf: SparkConf): String
sessionStateClassName
gives the name of the class of the SessionState per spark.sql.catalogImplementation, i.e.
- org.apache.spark.sql.hive.HiveSessionStateBuilder for
hive
- org.apache.spark.sql.internal.SessionStateBuilder for
in-memory
sessionStateClassName
is used when SparkSession
is requested for the SessionState (and one is not available yet).
Creating DataFrame From RDD Of Internal Binary Rows and Schema¶
internalCreateDataFrame(
catalystRows: RDD[InternalRow],
schema: StructType,
isStreaming: Boolean = false): DataFrame
internalCreateDataFrame
creates a DataFrame with LogicalRDD.
internalCreateDataFrame
is used when:
-
DataFrameReader
is requested to create a DataFrame from Dataset of JSONs or CSVs -
SparkSession
is requested to create a DataFrame from RDD of rows -
InsertIntoDataSourceCommand logical command is executed
ExecutionListenerManager¶
listenerManager: ExecutionListenerManager
sharedState: SharedState
Measuring Duration of Executing Code Block¶
time[T](f: => T): T
time
executes a code block and prints out (to standard output) the time taken to execute it
Applying SparkSessionExtensions¶
applyExtensions(
extensionConfClassNames: Seq[String],
extensions: SparkSessionExtensions): SparkSessionExtensions
applyExtensions
uses the given extensionConfClassNames
as the names of the extension classes.
For every extension class, applyExtensions
instantiates one by one passing in the given SparkSessionExtensions.
Note
The given SparkSessionExtensions
is mutated in-place.
In the end, applyExtensions
returns the given SparkSessionExtensions
.
In case of ClassCastException
, ClassNotFoundException
or NoClassDefFoundError
, applyExtensions
prints out the following WARN message to the logs:
Cannot use [extensionConfClassName] to configure session extensions.
applyExtensions
is used when:
SparkSession.Builder
is requested to get active or create a new SparkSession instanceSparkSession
is created (from aSparkContext
)
Default Parallelism of Leaf Nodes¶
leafNodeDefaultParallelism: Int
leafNodeDefaultParallelism
is the value of spark.sql.leafNodeDefaultParallelism if defined or SparkContext.defaultParallelism
(Spark Core).
leafNodeDefaultParallelism
is used when:
- SparkSession.range operator is used
RangeExec
leaf physical operator is createdCommandResultExec
physical operator is requested for theRDD[InternalRow]
LocalTableScanExec
physical operator is requested for the RDDFilePartition
is requested for maxSplitBytes