FindDataSourceTable Logical Resolution Rule¶
FindDataSourceTable is a Catalyst rule to resolve UnresolvedCatalogRelation logical operators (of Spark and Hive tables) in a logical query plan (Rule[LogicalPlan]).
FindDataSourceTable is used by Hive and Spark Analyzers as part of their extendedResolutionRules.
Creating Instance¶
FindDataSourceTable takes the following to be created:
FindDataSourceTable is created when:
HiveSessionStateBuilderis requested for the AnalyzerBaseSessionStateBuilderis requested for the Analyzer
Execute Rule¶
apply traverses the given LogicalPlan (from top to leaves) to resolve UnresolvedCatalogRelations of the following logical operators:
- InsertIntoStatement with a non-streaming
UnresolvedCatalogRelationof Spark (DataSource) table - InsertIntoStatement with a non-streaming
UnresolvedCatalogRelationof a Hive table - AppendData (that is not by name) with a DataSourceV2Relation of V1Table
- A non-streaming
UnresolvedCatalogRelationof Spark (DataSource) table - A non-streaming
UnresolvedCatalogRelationof a Hive table - A streaming
UnresolvedCatalogRelation - A
StreamingRelationV2(Spark Structured Streaming) over a streamingUnresolvedCatalogRelation
Streaming and Non-Streaming UnresolvedCatalogRelations
The difference between streaming and non-streaming UnresolvedCatalogRelations is the isStreaming flag that is disabled (false) by default.
apply...FIXME
Create StreamingRelation¶
getStreamingRelation(
table: CatalogTable,
extraOptions: CaseInsensitiveStringMap): StreamingRelation
getStreamingRelation creates a StreamingRelation (Spark Structured Streaming) with a DataSource with the following:
| Property | Value |
|---|---|
| DataSource provider | The provider of the given CatalogTable |
| User-specified schema | The schema of the given CatalogTable |
| Options | DataSource options based on the given extraOptions and the CatalogTable |
| CatalogTable | The given CatalogTable |
getStreamingRelation is used when:
FindDataSourceTableis requested to resolve streamingUnresolvedCatalogRelations
Demo¶
scala> :type spark
org.apache.spark.sql.SparkSession
// Example: InsertIntoTable with UnresolvedCatalogRelation
// Drop tables to make the example reproducible
val db = spark.catalog.currentDatabase
Seq("t1", "t2").foreach { t =>
spark.sharedState.externalCatalog.dropTable(db, t, ignoreIfNotExists = true, purge = true)
}
// Create tables
sql("CREATE TABLE t1 (id LONG) USING parquet")
sql("CREATE TABLE t2 (id LONG) USING orc")
import org.apache.spark.sql.catalyst.dsl.plans._
val plan = table("t1").insertInto(tableName = "t2", overwrite = true)
scala> println(plan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- 'UnresolvedRelation `t1`
// Transform the logical plan with ResolveRelations logical rule first
// so UnresolvedRelations become UnresolvedCatalogRelations
import spark.sessionState.analyzer.ResolveRelations
val planWithUnresolvedCatalogRelations = ResolveRelations(plan)
scala> println(planWithUnresolvedCatalogRelations.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- 'SubqueryAlias t1
02 +- 'UnresolvedCatalogRelation `default`.`t1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
// Let's resolve UnresolvedCatalogRelations then
import org.apache.spark.sql.execution.datasources.FindDataSourceTable
val r = new FindDataSourceTable(spark)
val tablesResolvedPlan = r(planWithUnresolvedCatalogRelations)
// FIXME Why is t2 not resolved?!
scala> println(tablesResolvedPlan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- SubqueryAlias t1
02 +- Relation[id#10L] parquet