Skip to content

ScalaUDF

ScalaUDF is an Expression to manage the lifecycle of a user-defined function (and hook it to Catalyst execution path).

ScalaUDF is a NonSQLExpression (i.e. has no representation in SQL).

ScalaUDF is a UserDefinedExpression

Creating Instance

ScalaUDF takes the following to be created:

ScalaUDF is created when:

  • UDFRegistration is requested to register a UDF
  • BaseDynamicPartitionDataWriter is requested for partitionPathExpression
  • SparkUserDefinedFunction is requested to createScalaUDF
  • HandleNullInputsForUDF and ResolveEncodersInUDF logical evaluation rules are executed
  • ImplicitTypeCasts utility is used for transform function

deterministic

deterministic: Boolean

ScalaUDF is deterministic when all the following hold:

  1. udfDeterministic is enabled
  2. All the children are deterministic

deterministic is part of the Expression abstraction.

Text Representation

toString: String

toString uses the name and the [children] for the text representation:

[name]([comma-separated children])

toString is part of the TreeNode abstraction.

Name

name: String

name is the udfName (if defined) or UDF.

name is part of the UserDefinedExpression abstraction.

Code-Generated Expression Evaluation

doGenCode(
  ctx: CodegenContext,
  ev: ExprCode): ExprCode

doGenCode...FIXME

doGenCode is part of the Expression abstraction.

Interpreted Expression Evaluation

eval(
  input: InternalRow): Any

eval...FIXME

eval is part of the Expression abstraction.

Node Patterns

nodePatterns: Seq[TreePattern]

nodePatterns is SCALA_UDF.

nodePatterns is part of the TreeNode abstraction.

Analysis

Logical Analyzer uses HandleNullInputsForUDF and ResolveEncodersInUDF logical evaluation rules to analyze queries with ScalaUDF expressions.

Demo

Let's define a zero-argument UDF.

val myUDF = udf { () => "Hello World" }
// "Execute" the UDF
// Attach it to an "execution environment", i.e. a Dataset
// by specifying zero columns to execute on (since the UDF is no-arg)
import org.apache.spark.sql.catalyst.expressions.ScalaUDF
val scalaUDF = myUDF().expr.asInstanceOf[ScalaUDF]

assert(scalaUDF.resolved)

Let's execute the UDF (on every row in a Dataset). We simulate it relying on the EmptyRow that is the default InternalRow of eval.

scala> scalaUDF.eval()
res2: Any = Hello World

Let's define a UDF of one argument.

val lengthUDF = udf { s: String => s.length }.withName("lengthUDF")
val c = lengthUDF($"name")
scala> println(c.expr.treeString)
UDF:lengthUDF('name)
+- 'name
import org.apache.spark.sql.catalyst.expressions.ScalaUDF
assert(c.expr.isInstanceOf[ScalaUDF])

Let's define another UDF of one argument.

val hello = udf { s: String => s"Hello $s" }

// Binding the hello UDF to a column name
import org.apache.spark.sql.catalyst.expressions.ScalaUDF
val helloScalaUDF = hello($"name").expr.asInstanceOf[ScalaUDF]

assert(helloScalaUDF.resolved == false)
// Resolve helloScalaUDF, i.e. the only `name` column reference

scala> helloScalaUDF.children
res4: Seq[org.apache.spark.sql.catalyst.expressions.Expression] = ArrayBuffer('name)
// The column is free (i.e. not bound to a Dataset)
// Define a Dataset that becomes the rows for the UDF
val names = Seq("Jacek", "Agata").toDF("name")
scala> println(names.queryExecution.analyzed.numberedTreeString)
00 Project [value#1 AS name#3]
01 +- LocalRelation [value#1]

Resolve the references using the Dataset.

val plan = names.queryExecution.analyzed
val resolver = spark.sessionState.analyzer.resolver
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
val resolvedUDF = helloScalaUDF.transformUp { case a @ UnresolvedAttribute(names) =>
  // we're in controlled environment
  // so get is safe
  plan.resolve(names, resolver).get
}
assert(resolvedUDF.resolved)
scala> println(resolvedUDF.numberedTreeString)
00 UDF(name#3)
01 +- name#3: string
import org.apache.spark.sql.catalyst.expressions.BindReferences
val attrs = names.queryExecution.sparkPlan.output
val boundUDF = BindReferences.bindReference(resolvedUDF, attrs)

// Create an internal binary row, i.e. InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
val stringEncoder = ExpressionEncoder[String]
val row = stringEncoder.toRow("world")

Yay! It works!

scala> boundUDF.eval(row)
res8: Any = Hello world

Just to show the regular execution path (i.e. how to execute an UDF in a context of a Dataset).

val q = names.select(hello($"name"))
scala> q.show
+-----------+
|  UDF(name)|
+-----------+
|Hello Jacek|
|Hello Agata|
+-----------+