SessionCatalog — Session-Scoped Registry of Relational Entities¶
SessionCatalog
is a catalog of relational entities in SparkSession (e.g. databases, tables, views, partitions, and functions).
SessionCatalog
is a SQLConfHelper.
Creating Instance¶
SessionCatalog
takes the following to be created:
- ExternalCatalog
- GlobalTempViewManager
- FunctionRegistry
- TableFunctionRegistry
-
Configuration
(Apache Hadoop) - ParserInterface
- FunctionResourceLoader
-
FunctionExpressionBuilder
- Cache Size (default: spark.sql.filesourceTableRelationCacheSize)
- Cache TTL (default: spark.sql.metadataCacheTTLSeconds)
- Default database name (default: spark.sql.catalog.spark_catalog.defaultDatabase)
SessionCatalog
is created (and cached for later usage) when BaseSessionStateBuilder
is requested for one.
TableFunctionRegistry¶
SessionCatalog
is given a TableFunctionRegistry when created.
The TableFunctionRegistry
is used in the following:
- dropTempFunction
- isRegisteredFunction
- listBuiltinAndTempFunctions
- listTemporaryFunctions
- lookupBuiltinOrTempTableFunction
- lookupPersistentFunction
- resolveBuiltinOrTempTableFunction
- resolvePersistentTableFunction
- reset
- unregisterFunction
Accessing SessionCatalog¶
SessionCatalog
is available through SessionState (of the owning SparkSession).
val catalog = spark.sessionState.catalog
assert(catalog.isInstanceOf[org.apache.spark.sql.catalyst.catalog.SessionCatalog])
Default Database Name¶
SessionCatalog
defines default
as the name of the default database.
ExternalCatalog¶
SessionCatalog
creates an ExternalCatalog for the metadata of permanent entities (when first requested).
SessionCatalog
is in fact a layer over ExternalCatalog
(in a SparkSession) which allows for different metastores (i.e. in-memory
or hive).
Looking Up Function¶
lookupFunction(
name: FunctionIdentifier,
children: Seq[Expression]): Expression
lookupFunction
looks up a function by name
.
For a function with no database defined that exists in FunctionRegistry, lookupFunction
requests FunctionRegistry
to find the function (by its unqualified name, i.e. with no database).
If the name
function has the database defined or does not exist in FunctionRegistry
, lookupFunction
uses the fully-qualified function name
to check if the function exists in FunctionRegistry (by its fully-qualified name, i.e. with a database).
For other cases, lookupFunction
requests ExternalCatalog to find the function and loads its resources. lookupFunction
then creates a corresponding temporary function and looks up the function again.
lookupFunction
is used when:
- ResolveFunctions logical resolution rule executed (and resolves UnresolvedGenerator or UnresolvedFunction expressions)
HiveSessionCatalog
is requested to lookupFunction0
Looking Up Relation¶
lookupRelation(
name: TableIdentifier): LogicalPlan
lookupRelation
finds the name
table in the catalogs (i.e. GlobalTempViewManager, ExternalCatalog or registry of temporary views) and gives a SubqueryAlias
per table type.
Internally, lookupRelation
looks up the name
table using:
-
GlobalTempViewManager when the database name of the table matches the name of
GlobalTempViewManager
- Gives
SubqueryAlias
or reports aNoSuchTableException
- Gives
-
ExternalCatalog when the database name of the table is specified explicitly or the registry of temporary views does not contain the table
- Gives
SubqueryAlias
withView
when the table is a view (aka temporary table) - Gives
SubqueryAlias
withUnresolvedCatalogRelation
otherwise
- Gives
-
The registry of temporary views
- Gives
SubqueryAlias
with the logical plan per the table as registered in the registry of temporary views
- Gives
Note
lookupRelation
considers default to be the name of the database if the name
table does not specify the database explicitly.
Demo¶
scala> :type spark.sessionState.catalog
org.apache.spark.sql.catalyst.catalog.SessionCatalog
import spark.sessionState.{catalog => c}
import org.apache.spark.sql.catalyst.TableIdentifier
// Global temp view
val db = spark.sharedState.globalTempViewManager.database
// Make the example reproducible (and so "replace")
spark.range(1).createOrReplaceGlobalTempView("gv1")
val gv1 = TableIdentifier(table = "gv1", database = Some(db))
val plan = c.lookupRelation(gv1)
scala> println(plan.numberedTreeString)
00 SubqueryAlias gv1
01 +- Range (0, 1, step=1, splits=Some(8))
val metastore = spark.sharedState.externalCatalog
// Regular table
val db = spark.catalog.currentDatabase
metastore.dropTable(db, table = "t1", ignoreIfNotExists = true, purge = true)
sql("CREATE TABLE t1 (id LONG) USING parquet")
val t1 = TableIdentifier(table = "t1", database = Some(db))
val plan = c.lookupRelation(t1)
scala> println(plan.numberedTreeString)
00 'SubqueryAlias t1
01 +- 'UnresolvedCatalogRelation `default`.`t1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
// Regular view (not temporary view!)
// Make the example reproducible
metastore.dropTable(db, table = "v1", ignoreIfNotExists = true, purge = true)
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
val v1 = TableIdentifier(table = "v1", database = Some(db))
import org.apache.spark.sql.types.StructType
val schema = new StructType().add($"id".long)
val storage = CatalogStorageFormat(locationUri = None, inputFormat = None, outputFormat = None, serde = None, compressed = false, properties = Map())
val tableDef = CatalogTable(
identifier = v1,
tableType = CatalogTableType.VIEW,
storage,
schema,
viewText = Some("SELECT 1") /** Required or RuntimeException reported */)
metastore.createTable(tableDef, ignoreIfExists = false)
val plan = c.lookupRelation(v1)
scala> println(plan.numberedTreeString)
00 'SubqueryAlias v1
01 +- View (`default`.`v1`, [id#77L])
02 +- 'Project [unresolvedalias(1, None)]
03 +- OneRowRelation
// Temporary view
spark.range(1).createOrReplaceTempView("v2")
val v2 = TableIdentifier(table = "v2", database = None)
val plan = c.lookupRelation(v2)
scala> println(plan.numberedTreeString)
00 SubqueryAlias v2
01 +- Range (0, 1, step=1, splits=Some(8))
Retrieving Table Metadata¶
getTempViewOrPermanentTableMetadata(
name: TableIdentifier): CatalogTable
getTempViewOrPermanentTableMetadata
branches off based on the database (in the given TableIdentifier
).
When a database name is not specified, getTempViewOrPermanentTableMetadata
finds a local temporary view and creates a CatalogTable (with VIEW
table type and an undefined storage) or retrieves the table metadata from an external catalog.
With the database name of the GlobalTempViewManager, getTempViewOrPermanentTableMetadata
requests GlobalTempViewManager for the global view definition and creates a CatalogTable (with the name of GlobalTempViewManager
in table identifier, VIEW
table type and an undefined storage) or reports a NoSuchTableException
.
With the database name not of GlobalTempViewManager
, getTempViewOrPermanentTableMetadata
simply retrieves the table metadata from an external catalog.
lookupFunctionInfo¶
lookupFunctionInfo(
name: FunctionIdentifier): ExpressionInfo
lookupFunctionInfo
...FIXME
lookupFunctionInfo
is used when:
SparkGetFunctionsOperation
(Spark Thrift Server) is requested torunInternal
CatalogImpl
is requested to makeFunction
lookupBuiltinOrTempFunction¶
lookupBuiltinOrTempFunction(
name: String): Option[ExpressionInfo]
lookupBuiltinOrTempFunction
...FIXME
lookupBuiltinOrTempFunction
is used when:
ResolveFunctions
logical analysis is requested to lookupBuiltinOrTempFunctionSessionCatalog
is requested to lookupFunctionInfo
lookupBuiltinOrTempTableFunction¶
lookupBuiltinOrTempTableFunction(
name: String): Option[ExpressionInfo]
lookupBuiltinOrTempTableFunction
...FIXME
lookupBuiltinOrTempTableFunction
is used when:
ResolveFunctions
logical analysis is requested to lookupBuiltinOrTempTableFunctionSessionCatalog
is requested to lookupFunctionInfo
lookupPersistentFunction¶
lookupPersistentFunction(
name: FunctionIdentifier): ExpressionInfo
lookupPersistentFunction
...FIXME
lookupPersistentFunction
is used when:
SessionCatalog
is requested to lookupFunctionInfoV2SessionCatalog
is requested to load a function
resolveBuiltinOrTempTableFunction¶
resolveBuiltinOrTempTableFunction(
name: String,
arguments: Seq[Expression]): Option[LogicalPlan]
resolveBuiltinOrTempTableFunction
resolveBuiltinOrTempFunctionInternal with the TableFunctionRegistry.
resolveBuiltinOrTempTableFunction
is used when:
- ResolveFunctions logical analysis rule is executed (to resolve a UnresolvedTableValuedFunction logical operator)
SessionCatalog
is requested to lookupTableFunction
resolveBuiltinOrTempFunctionInternal¶
resolveBuiltinOrTempFunctionInternal[T](
name: String,
arguments: Seq[Expression],
isBuiltin: FunctionIdentifier => Boolean,
registry: FunctionRegistryBase[T]): Option[T]
resolveBuiltinOrTempFunctionInternal
creates a FunctionIdentifier
(for the given name
).
resolveBuiltinOrTempFunctionInternal
...FIXME
Note
resolveBuiltinOrTempFunctionInternal
is fairly simple yet I got confused what it does actually so I marked it FIXME
.
resolveBuiltinOrTempFunctionInternal
is used when:
SessionCatalog
is requested to resolveBuiltinOrTempFunction, resolveBuiltinOrTempTableFunction
registerFunction¶
registerFunction(
funcDefinition: CatalogFunction,
overrideIfExists: Boolean,
functionBuilder: Option[Seq[Expression] => Expression] = None): Unit
registerFunction[T](
funcDefinition: CatalogFunction,
overrideIfExists: Boolean,
registry: FunctionRegistryBase[T],
functionBuilder: Seq[Expression] => T): Unit
registerFunction
...FIXME
registerFunction
is used when:
CreateFunctionCommand
is executedRefreshFunctionCommand
is executedSessionCatalog
is requested to resolvePersistentFunctionInternal
Looking Up Table Metadata¶
getTableMetadata(
name: TableIdentifier): CatalogTable
getTableMetadata
getTableRawMetadata and replaces CharType
and VarcharType
field types with StringType
in the table schema, recursively.
CharType
and VarcharType
Unsupported
The Spark SQL query engine does not support char/varchar types yet.