FindDataSourceTable Logical Resolution Rule¶
FindDataSourceTable
is a Catalyst rule to resolve UnresolvedCatalogRelation logical operators (of Spark and Hive tables) in a logical query plan (Rule[LogicalPlan]
).
FindDataSourceTable
is used by Hive and Spark Analyzers as part of their extendedResolutionRules.
Creating Instance¶
FindDataSourceTable
takes the following to be created:
FindDataSourceTable
is created when:
HiveSessionStateBuilder
is requested for the AnalyzerBaseSessionStateBuilder
is requested for the Analyzer
Execute Rule¶
apply
traverses the given LogicalPlan (from top to leaves) to resolve UnresolvedCatalogRelation
s of the following logical operators:
- InsertIntoStatement with a non-streaming
UnresolvedCatalogRelation
of Spark (DataSource) table - InsertIntoStatement with a non-streaming
UnresolvedCatalogRelation
of a Hive table - AppendData (that is not by name) with a DataSourceV2Relation of V1Table
- A non-streaming
UnresolvedCatalogRelation
of Spark (DataSource) table - A non-streaming
UnresolvedCatalogRelation
of a Hive table - A streaming
UnresolvedCatalogRelation
- A
StreamingRelationV2
(Spark Structured Streaming) over a streamingUnresolvedCatalogRelation
Streaming and Non-Streaming UnresolvedCatalogRelation
s
The difference between streaming and non-streaming UnresolvedCatalogRelation
s is the isStreaming flag that is disabled (false
) by default.
apply
...FIXME
Create StreamingRelation¶
getStreamingRelation(
table: CatalogTable,
extraOptions: CaseInsensitiveStringMap): StreamingRelation
getStreamingRelation
creates a StreamingRelation
(Spark Structured Streaming) with a DataSource with the following:
Property | Value |
---|---|
DataSource provider | The provider of the given CatalogTable |
User-specified schema | The schema of the given CatalogTable |
Options | DataSource options based on the given extraOptions and the CatalogTable |
CatalogTable | The given CatalogTable |
getStreamingRelation
is used when:
FindDataSourceTable
is requested to resolve streamingUnresolvedCatalogRelation
s
Demo¶
scala> :type spark
org.apache.spark.sql.SparkSession
// Example: InsertIntoTable with UnresolvedCatalogRelation
// Drop tables to make the example reproducible
val db = spark.catalog.currentDatabase
Seq("t1", "t2").foreach { t =>
spark.sharedState.externalCatalog.dropTable(db, t, ignoreIfNotExists = true, purge = true)
}
// Create tables
sql("CREATE TABLE t1 (id LONG) USING parquet")
sql("CREATE TABLE t2 (id LONG) USING orc")
import org.apache.spark.sql.catalyst.dsl.plans._
val plan = table("t1").insertInto(tableName = "t2", overwrite = true)
scala> println(plan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- 'UnresolvedRelation `t1`
// Transform the logical plan with ResolveRelations logical rule first
// so UnresolvedRelations become UnresolvedCatalogRelations
import spark.sessionState.analyzer.ResolveRelations
val planWithUnresolvedCatalogRelations = ResolveRelations(plan)
scala> println(planWithUnresolvedCatalogRelations.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- 'SubqueryAlias t1
02 +- 'UnresolvedCatalogRelation `default`.`t1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
// Let's resolve UnresolvedCatalogRelations then
import org.apache.spark.sql.execution.datasources.FindDataSourceTable
val r = new FindDataSourceTable(spark)
val tablesResolvedPlan = r(planWithUnresolvedCatalogRelations)
// FIXME Why is t2 not resolved?!
scala> println(tablesResolvedPlan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- SubqueryAlias t1
02 +- Relation[id#10L] parquet