UDFs are Blackbox — Don't Use Them Unless You've Got No Choice¶
User-Defined Functions are a blackbox to the Catalyst optimizer and so whatever happens in the UDF, stays in the UDF (paraphrasing the former advertising slogan of Las Vegas, Nevada). With UDFs your queries will likely be slower and memory-inefficient.
Example¶
Let's review an example with an UDF. This example is converting strings of size 7 characters only and uses the Dataset
standard operators first and then custom UDF to do the same transformation.
assert(spark.conf.get("spark.sql.parquet.filterPushdown"))
You are going to use the following cities
dataset that is based on Parquet file (as used in Predicate Pushdown / Filter Pushdown for Parquet Data Source section). The reason for parquet is that it is an external data source that does support optimization Spark uses to optimize itself like predicate pushdown.
// no optimization as it is a more involved Scala function in filter
val cities6chars = cities.filter(_.name.length == 6).map(_.name.toUpperCase)
cities6chars.explain(true)
// or simpler when only concerned with PushedFilters attribute in Parquet
scala> cities6chars.queryExecution.optimizedPlan
res33: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#248]
+- MapElements <function1>, class City, [StructField(id,LongType,false), StructField(name,StringType,true)], obj#247: java.lang.String
+- Filter <function1>.apply
+- DeserializeToObject newInstance(class City), obj#246: City
+- Relation[id#236L,name#237] parquet
// no optimization for Dataset[City]
val cities6chars = cities.filter(_.name == "Warsaw").map(_.name.toUpperCase)
cities6chars.explain(true)
// The filter predicate is pushed down fine for Dataset's Column-based query in where operator
scala> cities.where('name === "Warsaw").queryExecution.executedPlan
res29: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:string>
// Let's define a UDF to do the filtering
val isWarsaw = udf { (s: String) => s == "Warsaw" }
// Use the UDF in where (replacing the Column-based query)
scala> cities.where(isWarsaw('name)).queryExecution.executedPlan
res33: org.apache.spark.sql.execution.SparkPlan =
*Filter UDF(name#129)
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,name:string>