Skip to content

SessionCatalog — Session-Scoped Registry of Relational Entities

SessionCatalog is a catalog of relational entities in SparkSession (e.g. databases, tables, views, partitions, and functions).

SessionCatalog is a SQLConfHelper.

Creating Instance

SessionCatalog takes the following to be created:

SessionCatalog and Spark SQL Services

SessionCatalog is created (and cached for later usage) when BaseSessionStateBuilder is requested for one.

TableFunctionRegistry

SessionCatalog is given a TableFunctionRegistry when created.

The TableFunctionRegistry is used in the following:

Accessing SessionCatalog

SessionCatalog is available through SessionState (of the owning SparkSession).

val catalog = spark.sessionState.catalog
assert(catalog.isInstanceOf[org.apache.spark.sql.catalyst.catalog.SessionCatalog])

Default Database Name

SessionCatalog defines default as the name of the default database.

ExternalCatalog

SessionCatalog creates an ExternalCatalog for the metadata of permanent entities (when first requested).

SessionCatalog is in fact a layer over ExternalCatalog (in a SparkSession) which allows for different metastores (i.e. in-memory or hive).

Looking Up Function

lookupFunction(
  name: FunctionIdentifier,
  children: Seq[Expression]): Expression

lookupFunction looks up a function by name.

For a function with no database defined that exists in FunctionRegistry, lookupFunction requests FunctionRegistry to find the function (by its unqualified name, i.e. with no database).

If the name function has the database defined or does not exist in FunctionRegistry, lookupFunction uses the fully-qualified function name to check if the function exists in FunctionRegistry (by its fully-qualified name, i.e. with a database).

For other cases, lookupFunction requests ExternalCatalog to find the function and loads its resources. lookupFunction then creates a corresponding temporary function and looks up the function again.

lookupFunction is used when:

Looking Up Relation

lookupRelation(
  name: TableIdentifier): LogicalPlan

lookupRelation finds the name table in the catalogs (i.e. GlobalTempViewManager, ExternalCatalog or registry of temporary views) and gives a SubqueryAlias per table type.

Internally, lookupRelation looks up the name table using:

  1. GlobalTempViewManager when the database name of the table matches the name of GlobalTempViewManager

    • Gives SubqueryAlias or reports a NoSuchTableException
  2. ExternalCatalog when the database name of the table is specified explicitly or the registry of temporary views does not contain the table

    • Gives SubqueryAlias with View when the table is a view (aka temporary table)
    • Gives SubqueryAlias with UnresolvedCatalogRelation otherwise
  3. The registry of temporary views

Note

lookupRelation considers default to be the name of the database if the name table does not specify the database explicitly.

Demo

scala> :type spark.sessionState.catalog
org.apache.spark.sql.catalyst.catalog.SessionCatalog

import spark.sessionState.{catalog => c}
import org.apache.spark.sql.catalyst.TableIdentifier

// Global temp view
val db = spark.sharedState.globalTempViewManager.database
// Make the example reproducible (and so "replace")
spark.range(1).createOrReplaceGlobalTempView("gv1")
val gv1 = TableIdentifier(table = "gv1", database = Some(db))
val plan = c.lookupRelation(gv1)
scala> println(plan.numberedTreeString)
00 SubqueryAlias gv1
01 +- Range (0, 1, step=1, splits=Some(8))

val metastore = spark.sharedState.externalCatalog

// Regular table
val db = spark.catalog.currentDatabase
metastore.dropTable(db, table = "t1", ignoreIfNotExists = true, purge = true)
sql("CREATE TABLE t1 (id LONG) USING parquet")
val t1 = TableIdentifier(table = "t1", database = Some(db))
val plan = c.lookupRelation(t1)
scala> println(plan.numberedTreeString)
00 'SubqueryAlias t1
01 +- 'UnresolvedCatalogRelation `default`.`t1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe

// Regular view (not temporary view!)
// Make the example reproducible
metastore.dropTable(db, table = "v1", ignoreIfNotExists = true, purge = true)
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
val v1 = TableIdentifier(table = "v1", database = Some(db))
import org.apache.spark.sql.types.StructType
val schema = new StructType().add($"id".long)
val storage = CatalogStorageFormat(locationUri = None, inputFormat = None, outputFormat = None, serde = None, compressed = false, properties = Map())
val tableDef = CatalogTable(
  identifier = v1,
  tableType = CatalogTableType.VIEW,
  storage,
  schema,
  viewText = Some("SELECT 1") /** Required or RuntimeException reported */)
metastore.createTable(tableDef, ignoreIfExists = false)
val plan = c.lookupRelation(v1)
scala> println(plan.numberedTreeString)
00 'SubqueryAlias v1
01 +- View (`default`.`v1`, [id#77L])
02    +- 'Project [unresolvedalias(1, None)]
03       +- OneRowRelation

// Temporary view
spark.range(1).createOrReplaceTempView("v2")
val v2 = TableIdentifier(table = "v2", database = None)
val plan = c.lookupRelation(v2)
scala> println(plan.numberedTreeString)
00 SubqueryAlias v2
01 +- Range (0, 1, step=1, splits=Some(8))

Retrieving Table Metadata

getTempViewOrPermanentTableMetadata(
  name: TableIdentifier): CatalogTable

getTempViewOrPermanentTableMetadata branches off based on the database (in the given TableIdentifier).

When a database name is not specified, getTempViewOrPermanentTableMetadata finds a local temporary view and creates a CatalogTable (with VIEW table type and an undefined storage) or retrieves the table metadata from an external catalog.

With the database name of the GlobalTempViewManager, getTempViewOrPermanentTableMetadata requests GlobalTempViewManager for the global view definition and creates a CatalogTable (with the name of GlobalTempViewManager in table identifier, VIEW table type and an undefined storage) or reports a NoSuchTableException.

With the database name not of GlobalTempViewManager, getTempViewOrPermanentTableMetadata simply retrieves the table metadata from an external catalog.

lookupFunctionInfo

lookupFunctionInfo(
  name: FunctionIdentifier): ExpressionInfo

lookupFunctionInfo...FIXME


lookupFunctionInfo is used when:

  • SparkGetFunctionsOperation (Spark Thrift Server) is requested to runInternal
  • CatalogImpl is requested to makeFunction

lookupBuiltinOrTempFunction

lookupBuiltinOrTempFunction(
  name: String): Option[ExpressionInfo]

lookupBuiltinOrTempFunction...FIXME


lookupBuiltinOrTempFunction is used when:

lookupBuiltinOrTempTableFunction

lookupBuiltinOrTempTableFunction(
  name: String): Option[ExpressionInfo]

lookupBuiltinOrTempTableFunction...FIXME


lookupBuiltinOrTempTableFunction is used when:

lookupPersistentFunction

lookupPersistentFunction(
  name: FunctionIdentifier): ExpressionInfo

lookupPersistentFunction...FIXME


lookupPersistentFunction is used when:

resolveBuiltinOrTempTableFunction

resolveBuiltinOrTempTableFunction(
  name: String,
  arguments: Seq[Expression]): Option[LogicalPlan]

resolveBuiltinOrTempTableFunction resolveBuiltinOrTempFunctionInternal with the TableFunctionRegistry.


resolveBuiltinOrTempTableFunction is used when:

resolveBuiltinOrTempFunctionInternal

resolveBuiltinOrTempFunctionInternal[T](
  name: String,
  arguments: Seq[Expression],
  isBuiltin: FunctionIdentifier => Boolean,
  registry: FunctionRegistryBase[T]): Option[T]

resolveBuiltinOrTempFunctionInternal creates a FunctionIdentifier (for the given name).

resolveBuiltinOrTempFunctionInternal...FIXME

Note

resolveBuiltinOrTempFunctionInternal is fairly simple yet I got confused what it does actually so I marked it FIXME.


resolveBuiltinOrTempFunctionInternal is used when:

registerFunction

registerFunction(
  funcDefinition: CatalogFunction,
  overrideIfExists: Boolean,
  functionBuilder: Option[Seq[Expression] => Expression] = None): Unit
registerFunction[T](
  funcDefinition: CatalogFunction,
  overrideIfExists: Boolean,
  registry: FunctionRegistryBase[T],
  functionBuilder: Seq[Expression] => T): Unit

registerFunction...FIXME


registerFunction is used when:

Looking Up Table Metadata

getTableMetadata(
  name: TableIdentifier): CatalogTable

getTableMetadata getTableRawMetadata and replaces CharType and VarcharType field types with StringType in the table schema, recursively.

CharType and VarcharType Unsupported

The Spark SQL query engine does not support char/varchar types yet.

getRelation

getRelation(
  metadata: CatalogTable,
  options: CaseInsensitiveStringMap): LogicalPlan

getRelation creates a LogicalPlan with a SubqueryAlias logical operator with a child logical operator based on the table type of the given CatalogTable metadata:

Table Type Child Logical Operator
VIEW fromCatalogTable
EXTERNAL or MANAGED UnresolvedCatalogRelation

getRelation is used when: