PrunedFilteredScan — Relations with Column Pruning and Filter Pushdown¶
PrunedFilteredScan
is an abstraction of BaseRelations that support column pruning (eliminating unneeded columns) and filter pushdown (filtering using selected predicates only).
Contract¶
Building Distributed Scan¶
buildScan(
requiredColumns: Array[String],
filters: Array[Filter]): RDD[Row]
Builds a distributed data scan (RDD[Row]
) with column pruning and filter pushdown
Note
PrunedFilteredScan
is a "lighter" and stable version of the CatalystScan
abstraction.
See:
- JDBCRelation
DeltaCDFRelation
(Delta Lake)
Used when:
DataSourceStrategy
execution planning strategy is requested to plan a LogicalRelation over a PrunedFilteredScan
Implementations¶
- JDBCRelation
DeltaCDFRelation
(Delta Lake)
Example¶
// Use :paste to define MyBaseRelation case class
// BEGIN
import org.apache.spark.sql.sources.PrunedFilteredScan
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{StructField, StructType, StringType}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.Filter
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
case class MyBaseRelation(sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan {
override def schema: StructType = StructType(StructField("a", StringType) :: Nil)
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
println(s">>> [buildScan] requiredColumns = ${requiredColumns.mkString(",")}")
println(s">>> [buildScan] filters = ${filters.mkString(",")}")
import sqlContext.implicits._
(0 to 4).toDF.rdd
}
}
// END
val scan = MyBaseRelation(spark.sqlContext)
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
val plan: LogicalPlan = LogicalRelation(scan)
scala> println(plan.numberedTreeString)
00 Relation[a#1] MyBaseRelation(org.apache.spark.sql.SQLContext@4a57ad67)
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
val strategy = DataSourceStrategy(spark.sessionState.conf)
val sparkPlan = strategy(plan).head
// >>> [buildScan] requiredColumns = a
// >>> [buildScan] filters =
scala> println(sparkPlan.numberedTreeString)
00 Scan MyBaseRelation(org.apache.spark.sql.SQLContext@4a57ad67) [a#8] PushedFilters: [], ReadSchema: struct<a:string>