Skip to content

LocalRelation Leaf Logical Operator

LocalRelation is a leaf logical operator that represents a scan over local collections (and so allows for optimizations for functions like collect or take to be executed locally on the driver with no executors).

Creating Instance

LocalRelation takes the following to be created:

While created, LocalRelation asserts that the output attributes are all resolved or throws an IllegalArgumentException:

Unresolved attributes found when constructing LocalRelation.

LocalRelation can be created using apply, fromExternalRows, and fromProduct factory methods.

isStreaming Flag

isStreaming: Boolean

isStreaming is part of the LogicalPlan abstraction.

isStreaming can be given when LocalRelation is created.

isStreaming is false by default.

MultiInstanceRelation

LocalRelation is a MultiInstanceRelation.

Local Datasets

Dataset is local when the analyzed logical plan is a LocalRelation.

val data = Seq(1, 3, 4, 7)
val nums = data.toDF

scala> :type nums
org.apache.spark.sql.DataFrame

val plan = nums.queryExecution.analyzed
scala> println(plan.numberedTreeString)
00 LocalRelation [value#1]

import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
val relation = plan.collect { case r: LocalRelation => r }.head
assert(relation.isInstanceOf[LocalRelation])

val sql = relation.toSQL(inlineTableName = "demo")
assert(sql == "VALUES (1), (3), (4), (7) AS demo(value)")

val stats = relation.computeStats
scala> println(stats)
Statistics(sizeInBytes=48.0 B, hints=none)

Execution Planning

LocalRelation is resolved to LocalTableScanExec leaf physical operator by BasicOperators execution planning strategy.

import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
assert(relation.isInstanceOf[LocalRelation])

scala> :type spark
org.apache.spark.sql.SparkSession

import spark.sessionState.planner.BasicOperators
val localScan = BasicOperators(relation).head

import org.apache.spark.sql.execution.LocalTableScanExec
assert(localScan.isInstanceOf[LocalTableScanExec])

Statistics

computeStats(): Statistics

computeStats is part of the LeafNode abstraction.

computeStats is the size of the objects in a single row (per the output schema) and multiplies it by the number of rows (in the data).

SQL Representation

toSQL(
  inlineTableName: String): String

toSQL generates a SQL statement of the format:

VALUES [data] AS [inlineTableName]([names])

Note

toSQL does not seem to be used.

Creating Non-Empty LocalRelation

fromProduct(
  output: Seq[Attribute],
  data: Seq[Product]): LocalRelation

fromProduct creates a LocalRelation with the given output attributes and the data converted to InternalRows (using a Catalyst converter from the schema of the given attributes).

Demo

import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.types.IntegerType
val relation = LocalRelation.fromExternalRows(
  output = Seq(AttributeReference("id", IntegerType)()),
  data = Seq(Row(1)))