LogicalRelation Leaf Logical Operator¶
LogicalRelation is a leaf logical operator that represents a BaseRelation in a logical query plan.
LogicalRelation is a ExposesMetadataColumns and can add extra metadata columns to the output columns.
LogicalRelation is a MultiInstanceRelation.
Creating Instance¶
LogicalRelation takes the following to be created:
- BaseRelation
- Output Schema (
AttributeReferences) - Optional CatalogTable
-
isStreamingflag
LogicalRelation is created using apply utility.
Create LogicalRelation¶
apply(
relation: BaseRelation,
isStreaming: Boolean = false): LogicalRelation
apply(
relation: BaseRelation,
table: CatalogTable): LogicalRelation
apply wraps the given BaseRelation into a LogicalRelation (so it could be used in a logical query plan).
apply creates a LogicalRelation for the given BaseRelation (with a CatalogTable and isStreaming flag).
import org.apache.spark.sql.sources.BaseRelation
val baseRelation: BaseRelation = ???
val data = spark.baseRelationToDataFrame(baseRelation)
apply is used when:
- CreateTempViewUsing command is executed
FallBackFileSourceV2logical resolution rule is executedFileStreamSource(Spark Structured Streaming) is requested togetBatchHiveMetastoreCatalogis requested to convert a HiveTableRelation- ResolveDataSource logical analysis rule is executed (to resolve a V1BatchSource)
- ResolveSQLOnFile and FindDataSourceTable logical evaluation rules are executed
SparkSessionis requested for a DataFrame for a BaseRelation
Refresh (Files of HadoopFsRelation)¶
refresh requests the FileIndex (of the HadoopFsRelation) to refresh.
HadoopFsRelation Supported Only
refresh does the work for HadoopFsRelation relations only.
Simple Text Representation¶
simpleString is made up of the output schema (truncated to maxFields) and the relation:
Relation[[output]] [relation]
Demo¶
val q = spark.read.text("README.md")
val logicalPlan = q.queryExecution.logical
scala> println(logicalPlan.simpleString)
Relation[value#2] text
Statistics¶
computeStats takes the optional CatalogTable.
If available, computeStats requests the CatalogTable for the CatalogStatistics that, if available, is requested to toPlanStats (with the planStatsEnabled flag enabled when either spark.sql.cbo.enabled or spark.sql.cbo.planStats.enabled is enabled).
Otherwise, computeStats creates a Statistics with the sizeInBytes only to be the sizeInBytes of the BaseRelation.
Metadata Output Columns¶
LogicalPlan
metadataOutput: Seq[AttributeReference]
metadataOutput is part of the LogicalPlan abstraction.
metadataOutput checks out whether this BaseRelation is a HadoopFsRelation. If so, metadataOutput requests the FileFormat (of this BaseRelation) for metadata columns.
Otherwise, metadataOutput returns no metadata columns (Nil).
Lazy Value
metadataOutput is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
Add Metadata Columns to Output Columns¶
ExposesMetadataColumns
withMetadataColumns(): LogicalRelation
withMetadataColumns is part of the ExposesMetadataColumns abstraction.
withMetadataColumns creates a new LogicalRelation with the extra metadata columns added (if there are any) to this output columns.
Otherwise, withMetadataColumns does nothing.
Demo¶
The following are two logically-equivalent batch queries described using different Spark APIs: Scala and SQL.
val format = "csv"
val path = "../datasets/people.csv"
val loadQuery = spark
.read
.format(format)
.option("header", true)
.load(path)
scala> println(loadQuery.queryExecution.logical.numberedTreeString)
00 UnresolvedDataSource format: csv, isStreaming: false, paths: 1 provided
val selectQuery = sql(s"select * from `$format`.`$path`")
scala> println(selectQuery.queryExecution.optimizedPlan.numberedTreeString)
00 Relation [_c0#75,_c1#76] csv