PushDownPredicate Logical Optimization¶
Danger
PushDownPredicate
no longer exists in Spark SQL and is being migrated to PushDownPredicates logical optimization.
Review Me¶
When you execute Dataset.md#where[where] or Dataset.md#filter[filter] operators right after loading a dataset, Spark SQL will try to push the where/filter predicate down to the data source using a corresponding SQL query with WHERE
clause (or whatever the proper language for the data source is).
This optimization is called filter pushdown or predicate pushdown and aims at pushing down the filtering to the "bare metal", i.e. a data source engine. That is to increase the performance of queries since the filtering is performed at the very low level rather than dealing with the entire dataset after it has been loaded to Spark's memory and perhaps causing memory issues.
PushDownPredicate
is also applied to structured queries with filters after projections or filtering on window partitions.
Pushing Filter Operator Down Using Projection¶
val dataset = spark.range(2)
scala> dataset.select('id as "_id").filter('_id === 0).explain(extended = true)
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
!Filter (_id#14L = cast(0 as bigint)) Project [id#11L AS _id#14L]
!+- Project [id#11L AS _id#14L] +- Filter (id#11L = cast(0 as bigint))
+- Range (0, 2, step=1, splits=Some(8)) +- Range (0, 2, step=1, splits=Some(8))
...
== Parsed Logical Plan ==
'Filter ('_id = 0)
+- Project [id#11L AS _id#14L]
+- Range (0, 2, step=1, splits=Some(8))
== Analyzed Logical Plan ==
_id: bigint
Filter (_id#14L = cast(0 as bigint))
+- Project [id#11L AS _id#14L]
+- Range (0, 2, step=1, splits=Some(8))
== Optimized Logical Plan ==
Project [id#11L AS _id#14L]
+- Filter (id#11L = 0)
+- Range (0, 2, step=1, splits=Some(8))
== Physical Plan ==
*Project [id#11L AS _id#14L]
+- *Filter (id#11L = 0)
+- *Range (0, 2, step=1, splits=Some(8))
Optimizing Window Aggregate Operators¶
val dataset = spark.range(5).withColumn("group", 'id % 3)
scala> dataset.show
+---+-----+
| id|group|
+---+-----+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 0|
| 4| 1|
+---+-----+
import org.apache.spark.sql.expressions.Window
val groupW = Window.partitionBy('group).orderBy('id)
// Filter out group 2 after window
// No need to compute rank for group 2
// Push the filter down
val ranked = dataset.withColumn("rank", rank over groupW).filter('group !== 2)
scala> ranked.queryExecution.optimizedPlan
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
!Filter NOT (group#35L = cast(2 as bigint)) Project [id#32L, group#35L, rank#203]
!+- Project [id#32L, group#35L, rank#203] +- Project [id#32L, group#35L, rank#203, rank#203]
! +- Project [id#32L, group#35L, rank#203, rank#203] +- Window [rank(id#32L) windowspecdefinition(group#35L, id#32L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#203], [group#35L], [id#32L ASC]
! +- Window [rank(id#32L) windowspecdefinition(group#35L, id#32L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#203], [group#35L], [id#32L ASC] +- Project [id#32L, group#35L]
! +- Project [id#32L, group#35L] +- Project [id#32L, (id#32L % cast(3 as bigint)) AS group#35L]
! +- Project [id#32L, (id#32L % cast(3 as bigint)) AS group#35L] +- Filter NOT ((id#32L % cast(3 as bigint)) = cast(2 as bigint))
+- Range (0, 5, step=1, splits=Some(8)) +- Range (0, 5, step=1, splits=Some(8))
...
res1: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Window [rank(id#32L) windowspecdefinition(group#35L, id#32L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#203], [group#35L], [id#32L ASC]
+- Project [id#32L, (id#32L % 3) AS group#35L]
+- Filter NOT ((id#32L % 3) = 2)
+- Range (0, 5, step=1, splits=Some(8))
JDBC Data Source¶
Given the following code:
// Start with the PostgreSQL driver on CLASSPATH
case class Project(id: Long, name: String, website: String)
// No optimizations for typed queries
// LOG: execute <unnamed>: SELECT "id","name","website" FROM projects
val df = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:sparkdb")
.option("dbtable", "projects")
.load()
.as[Project]
.filter(_.name.contains("Spark"))
// Only the following would end up with the pushdown
val df = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:sparkdb")
.option("dbtable", "projects")
.load()
.where("""name like "%Spark%"""")
PushDownPredicate
translates the above query to the following SQL query:
LOG: execute <unnamed>: SELECT "id","name","website" FROM projects WHERE (name LIKE '%Spark%')
Tip
Enable all
logs in PostgreSQL to see the above SELECT and other query statements.
log_statement = 'all'
Add log_statement = 'all'
to /usr/local/var/postgres/postgresql.conf
on Mac OS X with PostgreSQL installed using brew
.
Parquet Data Source¶
val spark: SparkSession = ...
import spark.implicits._
// paste it to REPL individually to make the following line work
case class City(id: Long, name: String)
import org.apache.spark.sql.SaveMode.Overwrite
Seq(
City(0, "Warsaw"),
City(1, "Toronto"),
City(2, "London"),
City(3, "Redmond"),
City(4, "Boston")).toDF.write.mode(Overwrite).parquet("cities.parquet")
val cities = spark.read.parquet("cities.parquet").as[City]
// Using DataFrame's Column-based query
scala> cities.where('name === "Warsaw").queryExecution.executedPlan
res21: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:string>
// Using SQL query
scala> cities.where("""name = "Warsaw"""").queryExecution.executedPlan
res23: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:string>
// Using Dataset's strongly type-safe filter
// Why does the following not push the filter down?
scala> cities.filter(_.name == "Warsaw").queryExecution.executedPlan
res24: org.apache.spark.sql.execution.SparkPlan =
*Filter <function1>.apply
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,name:string>