LocalRelation Leaf Logical Operator¶
LocalRelation is a leaf logical operator that represents a scan over local collections (and so allows for optimizations for functions like collect or take to be executed locally on the driver with no executors).
Creating Instance¶
LocalRelation takes the following to be created:
- Output Schema Attributes
- Data (InternalRows)
- isStreaming flag
While created, LocalRelation asserts that the output attributes are all resolved or throws an IllegalArgumentException:
Unresolved attributes found when constructing LocalRelation.
LocalRelation can be created using apply, fromExternalRows, and fromProduct factory methods.
isStreaming Flag¶
isStreaming: Boolean
isStreaming is part of the LogicalPlan abstraction.
isStreaming can be given when LocalRelation is created.
isStreaming is false by default.
MultiInstanceRelation¶
LocalRelation is a MultiInstanceRelation.
Local Datasets¶
Dataset is local when the analyzed logical plan is a LocalRelation.
val data = Seq(1, 3, 4, 7)
val nums = data.toDF
scala> :type nums
org.apache.spark.sql.DataFrame
val plan = nums.queryExecution.analyzed
scala> println(plan.numberedTreeString)
00 LocalRelation [value#1]
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
val relation = plan.collect { case r: LocalRelation => r }.head
assert(relation.isInstanceOf[LocalRelation])
val sql = relation.toSQL(inlineTableName = "demo")
assert(sql == "VALUES (1), (3), (4), (7) AS demo(value)")
val stats = relation.computeStats
scala> println(stats)
Statistics(sizeInBytes=48.0 B, hints=none)
Execution Planning¶
LocalRelation is resolved to LocalTableScanExec leaf physical operator by BasicOperators execution planning strategy.
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
assert(relation.isInstanceOf[LocalRelation])
scala> :type spark
org.apache.spark.sql.SparkSession
import spark.sessionState.planner.BasicOperators
val localScan = BasicOperators(relation).head
import org.apache.spark.sql.execution.LocalTableScanExec
assert(localScan.isInstanceOf[LocalTableScanExec])
Statistics¶
computeStats(): Statistics
computeStats is part of the LeafNode abstraction.
computeStats is the size of the objects in a single row (per the output schema) and multiplies it by the number of rows (in the data).
SQL Representation¶
toSQL(
inlineTableName: String): String
toSQL generates a SQL statement of the format:
VALUES [data] AS [inlineTableName]([names])
Note
toSQL does not seem to be used.
Creating Non-Empty LocalRelation¶
fromProduct(
output: Seq[Attribute],
data: Seq[Product]): LocalRelation
fromProduct creates a LocalRelation with the given output attributes and the data converted to InternalRows (using a Catalyst converter from the schema of the given attributes).
Demo¶
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.types.IntegerType
val relation = LocalRelation.fromExternalRows(
output = Seq(AttributeReference("id", IntegerType)()),
data = Seq(Row(1)))