Skip to content

FindDataSourceTable Logical Resolution Rule

FindDataSourceTable is a Catalyst rule to resolve UnresolvedCatalogRelation logical operators (of Spark and Hive tables) in a logical query plan (Rule[LogicalPlan]).

FindDataSourceTable is used by Hive and Spark Analyzers as part of their extendedResolutionRules.

Creating Instance

FindDataSourceTable takes the following to be created:

FindDataSourceTable is created when:

  • HiveSessionStateBuilder is requested for the Analyzer
  • BaseSessionStateBuilder is requested for the Analyzer

Execute Rule

Rule
apply(
  plan: LogicalPlan): LogicalPlan

apply is part of the Rule abstraction.

apply traverses the given LogicalPlan (from top to leaves) to resolve UnresolvedCatalogRelations of the following logical operators:

  1. InsertIntoStatement with a non-streaming UnresolvedCatalogRelation of Spark (DataSource) table
  2. InsertIntoStatement with a non-streaming UnresolvedCatalogRelation of a Hive table
  3. AppendData (that is not by name) with a DataSourceV2Relation of V1Table
  4. A non-streaming UnresolvedCatalogRelation of Spark (DataSource) table
  5. A non-streaming UnresolvedCatalogRelation of a Hive table
  6. A streaming UnresolvedCatalogRelation
  7. A StreamingRelationV2 (Spark Structured Streaming) over a streaming UnresolvedCatalogRelation
Streaming and Non-Streaming UnresolvedCatalogRelations

The difference between streaming and non-streaming UnresolvedCatalogRelations is the isStreaming flag that is disabled (false) by default.

apply...FIXME

Create StreamingRelation

getStreamingRelation(
  table: CatalogTable,
  extraOptions: CaseInsensitiveStringMap): StreamingRelation

getStreamingRelation creates a StreamingRelation (Spark Structured Streaming) with a DataSource with the following:

Property Value
DataSource provider The provider of the given CatalogTable
User-specified schema The schema of the given CatalogTable
Options DataSource options based on the given extraOptions and the CatalogTable
CatalogTable The given CatalogTable

getStreamingRelation is used when:

  • FindDataSourceTable is requested to resolve streaming UnresolvedCatalogRelations

Demo

scala> :type spark
org.apache.spark.sql.SparkSession
// Example: InsertIntoTable with UnresolvedCatalogRelation
// Drop tables to make the example reproducible
val db = spark.catalog.currentDatabase
Seq("t1", "t2").foreach { t =>
  spark.sharedState.externalCatalog.dropTable(db, t, ignoreIfNotExists = true, purge = true)
}
// Create tables
sql("CREATE TABLE t1 (id LONG) USING parquet")
sql("CREATE TABLE t2 (id LONG) USING orc")
import org.apache.spark.sql.catalyst.dsl.plans._
val plan = table("t1").insertInto(tableName = "t2", overwrite = true)
scala> println(plan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- 'UnresolvedRelation `t1`
// Transform the logical plan with ResolveRelations logical rule first
// so UnresolvedRelations become UnresolvedCatalogRelations
import spark.sessionState.analyzer.ResolveRelations
val planWithUnresolvedCatalogRelations = ResolveRelations(plan)
scala> println(planWithUnresolvedCatalogRelations.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- 'SubqueryAlias t1
02    +- 'UnresolvedCatalogRelation `default`.`t1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
// Let's resolve UnresolvedCatalogRelations then
import org.apache.spark.sql.execution.datasources.FindDataSourceTable
val r = new FindDataSourceTable(spark)
val tablesResolvedPlan = r(planWithUnresolvedCatalogRelations)
// FIXME Why is t2 not resolved?!
scala> println(tablesResolvedPlan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- SubqueryAlias t1
02    +- Relation[id#10L] parquet