Catalyst DSL¶
Catalyst DSL is a collection of Scala implicit conversions for constructing Catalyst data structures more easily.
The goal of Catalyst DSL is to make working with Spark SQL's building blocks easier (e.g. for testing or Spark SQL internals exploration).
ExpressionConversions¶
Creates Catalyst expressions:
- Literals
- UnresolvedAttribute and UnresolvedReference
- ...
Type Conversions to Literal Expressions¶
ExpressionConversions
adds conversions of Scala native types (e.g. Boolean
, Long
, String
, Date
, Timestamp
) and Spark SQL types (i.e. Decimal
) to Literal expressions.
Converting Symbols to UnresolvedAttribute and AttributeReference Expressions¶
ExpressionConversions
adds conversions of Scala's Symbol
to UnresolvedAttribute and AttributeReference
expressions.
Converting $-Prefixed String Literals to UnresolvedAttribute Expressions¶
ExpressionConversions
adds conversions of $"col name"
to an UnresolvedAttribute expression.
at¶
at(
ordinal: Int): BoundReference
ExpressionConversions
adds at
method to AttributeReferences
to create a BoundReference expression.
import org.apache.spark.sql.catalyst.dsl.expressions._
val boundRef = 'hello.string.at(4)
scala> println(boundRef)
input[4, string, true]
star¶
star(names: String*): Expression
ExpressionConversions
adds the aggregate and non-aggregate functions to Catalyst expressions (e.g. sum
, count
, upper
, star
, callFunction
, windowSpec
, windowExpr
)
import org.apache.spark.sql.catalyst.dsl.expressions._
val s = star()
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar
assert(s.isInstanceOf[UnresolvedStar])
val s = star("a", "b")
scala> println(s)
WrappedArray(a, b).*
function and distinctFunction¶
ExpressionConversions
allows creating UnresolvedFunction expressions with function
and distinctFunction
operators.
function(exprs: Expression*): UnresolvedFunction
distinctFunction(exprs: Expression*): UnresolvedFunction
import org.apache.spark.sql.catalyst.dsl.expressions._
// Works with Scala Symbols only
val f = 'f.function()
scala> :type f
org.apache.spark.sql.catalyst.analysis.UnresolvedFunction
scala> f.isDistinct
res0: Boolean = false
val g = 'g.distinctFunction()
scala> g.isDistinct
res1: Boolean = true
notNull and canBeNull¶
ExpressionConversions
adds canBeNull
and notNull
operators to create a AttributeReference
with nullability
turned on or off, respectively.
notNull: AttributeReference
canBeNull: AttributeReference
ImplicitOperators¶
Adds operators to Catalyst expressions for complex expressions
Implicit Conversions for Logical Plans¶
import org.apache.spark.sql.catalyst.dsl.plans._
DslLogicalPlan¶
table¶
table
creates a UnresolvedRelation
logical operator.
table(
ref: String): LogicalPlan
table(
db: String,
ref: String): LogicalPlan
import org.apache.spark.sql.catalyst.dsl.plans._
val t1 = table("t1")
scala> println(t1.treeString)
'UnresolvedRelation `t1`
Package Object¶
Catalyst DSL is part of org.apache.spark.sql.catalyst.dsl
package object.
import org.apache.spark.sql.catalyst.dsl.expressions._
spark-shell¶
Some implicit conversions from the Catalyst DSL interfere with the implicits conversions from SQLImplicits
that are imported automatically in spark-shell
(through spark.implicits._
).
scala> 'hello.decimal
<console>:30: error: type mismatch;
found : Symbol
required: ?{def decimal: ?}
Note that implicit conversions are not applicable because they are ambiguous:
both method symbolToColumn in class SQLImplicits of type (s: Symbol)org.apache.spark.sql.ColumnName
and method DslSymbol in trait ExpressionConversions of type (sym: Symbol)org.apache.spark.sql.catalyst.dsl.expressions.DslSymbol
are possible conversion functions from Symbol to ?{def decimal: ?}
'hello.decimal
^
<console>:30: error: value decimal is not a member of Symbol
'hello.decimal
^
Use sbt console
with Spark libraries defined (in build.sbt
) instead.
You can also disable an implicit conversion using a trick described in How can an implicit be unimported from the Scala repl?.
// HACK: Disable symbolToColumn implicit conversion
// It is imported automatically in spark-shell (and makes demos impossible)
// implicit def symbolToColumn(s: Symbol): org.apache.spark.sql.ColumnName
trait ThatWasABadIdea
implicit def symbolToColumn(ack: ThatWasABadIdea) = ack
// HACK: Disable $ string interpolator
// It is imported automatically in spark-shell (and makes demos impossible)
implicit class StringToColumn(val sc: StringContext) {}
Demo¶
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
// ExpressionConversions
import org.apache.spark.sql.catalyst.expressions.Literal
scala> val trueLit: Literal = true
trueLit: org.apache.spark.sql.catalyst.expressions.Literal = true
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
scala> val name: UnresolvedAttribute = 'name
name: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'name
// NOTE: This conversion may not work, e.g. in spark-shell
// There is another implicit conversion StringToColumn in SQLImplicits
// It is automatically imported in spark-shell
// See :imports
val id: UnresolvedAttribute = $"id"
import org.apache.spark.sql.catalyst.expressions.Expression
scala> val expr: Expression = sum('id)
expr: org.apache.spark.sql.catalyst.expressions.Expression = sum('id)
// implicit class DslSymbol
scala> 'hello.s
res2: String = hello
scala> 'hello.attr
res4: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'hello
// implicit class DslString
scala> "helo".expr
res0: org.apache.spark.sql.catalyst.expressions.Expression = helo
scala> "helo".attr
res1: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'helo
// logical plans
scala> val t1 = table("t1")
t1: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
'UnresolvedRelation `t1`
scala> val p = t1.select('*).serialize[String].where('id % 2 == 0)
p: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
'Filter false
+- 'SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#1]
+- 'Project ['*]
+- 'UnresolvedRelation `t1`
// FIXME Does not work because SimpleAnalyzer's catalog is empty
// the p plan references a t1 table
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
scala> p.analyze