Skip to content

DataSourceStrategy Execution Planning Strategy

DataSourceStrategy is an execution planning strategy (of SparkPlanner) that plans LogicalRelation logical operators as RowDataSourceScanExec physical operators (possibly under FilterExec and ProjectExec logical operators).

Executing Rule

apply(
  plan: LogicalPlan): Seq[SparkPlan]

apply plans the given LogicalPlan into a corresponding SparkPlan.

Logical Operator Description
LogicalRelation with a CatalystScan relation
LogicalRelation with PrunedFilteredScan relation
LogicalRelation with a PrunedScan relation
LogicalRelation with a TableScan relation

pruneFilterProject

pruneFilterProject(
  relation: LogicalRelation,
  projects: Seq[NamedExpression],
  filterPredicates: Seq[Expression],
  scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]): SparkPlan

pruneFilterProject pruneFilterProjectRaw (with scanBuilder ignoring the Seq[Expression] input argument).

pruneFilterProject is used when:

Selecting Catalyst Expressions Convertible to Data Source Filter Predicates

selectFilters(
  relation: BaseRelation,
  predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Set[Filter])

selectFilters builds a map of Catalyst predicate expressions (from the input predicates) that can be translated to a data source filter predicate.

selectFilters then requests the input BaseRelation for unhandled filters (out of the convertible ones that selectFilters built the map with).

In the end, selectFilters returns a 3-element tuple with the following:

  1. Inconvertible and unhandled Catalyst predicate expressions

  2. All converted data source filters

  3. Pushed-down data source filters (that the input BaseRelation can handle)

selectFilters is used when DataSourceStrategy execution planning strategy is executed (and creates a RowDataSourceScanExec physical operator).

Translating Catalyst Expression into Data Source Filter Predicate

translateFilter(
  predicate: Expression,
  supportNestedPredicatePushdown: Boolean): Option[Filter]

translateFilter translateFilterWithMapping (with the input parameters and an undefined (None) translatedFilterToExpr).

translateFilter is used when:

  • FileSourceScanExec physical operator is requested for the pushedDownFilters
  • DataSourceStrategy execution planning strategy is requested to selectFilters
  • FileSourceStrategy execution planning strategy is executed
  • DataSourceV2Strategy execution planning strategy is executed
  • V2Writes logical optimization is requested to optimize a logical query

translateFilterWithMapping

translateFilterWithMapping(
  predicate: Expression,
  translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]],
  nestedPredicatePushdownEnabled: Boolean): Option[Filter]

translateFilterWithMapping translates the input Catalyst Expression to a Data Source Filter predicate.


translateFilterWithMapping branches off based on the given predicate expression:

  • For Ands, translateFilterWithMapping translateFilterWithMapping with the left and right expressions and creates a And filter

  • For Ors, translateFilterWithMapping translateFilterWithMapping with the left and right expressions and creates a Or filter

  • For Nots, translateFilterWithMapping translateFilterWithMapping with the child expression and creates a Not filter

  • For all the other cases, translateFilterWithMapping translateLeafNodeFilter and, if successful, adds the filter and the predicate expression to translatedFilterToExpr collection

translateFilterWithMapping is used when:

translateLeafNodeFilter

translateLeafNodeFilter(
  predicate: Expression,
  pushableColumn: PushableColumnBase): Option[Filter]

translateLeafNodeFilter translates a given Catalyst Expression into a corresponding Filter predicate if possible. If not, translateFilter returns None.

Catalyst Expression Filter Predicate
EqualTo (with a "pushable" column and a Literal) EqualTo
EqualNullSafe (with a "pushable" column and a Literal) EqualNullSafe
GreaterThan (with a "pushable" column and a Literal) GreaterThan or LessThan
LessThan (with a "pushable" column and a Literal) LessThan or GreaterThan
GreaterThanOrEqual (with a "pushable" column and a Literal) GreaterThanOrEqual or LessThanOrEqual
LessThanOrEqual (with a "pushable" column and a Literal) LessThanOrEqual or GreaterThanOrEqual
InSet (with a "pushable" column and values) In
InSet (with a "pushable" column and expressions) In
IsNull (with a "pushable" column) IsNull
IsNotNull (with a "pushable" column) IsNotNull
StartsWith (with a "pushable" column and a string Literal) StringStartsWith
EndsWith (with a "pushable" column and a string Literal) StringEndsWith
Contains (with a "pushable" column and a string Literal) StringContains
Literal (with true) AlwaysTrue
Literal (with false) AlwaysFalse

Note

The Catalyst expressions and their corresponding data source filter predicates have the same names in most cases but belong to different Scala packages (org.apache.spark.sql.catalyst.expressions and org.apache.spark.sql.sources, respectively).

RDD Conversion (Converting RDD of Rows to Catalyst RDD of InternalRows)

toCatalystRDD(
  relation: LogicalRelation,
  output: Seq[Attribute],
  rdd: RDD[Row]): RDD[InternalRow]
toCatalystRDD(
  relation: LogicalRelation,
  rdd: RDD[Row]) // <1>

toCatalystRDD branches off per the needConversion flag of the BaseRelation of the input LogicalRelation:

  • otherwise, toCatalystRDD casts the input RDD[Row] to an RDD[InternalRow] (using Java's asInstanceOf operator)

toCatalystRDD is used when DataSourceStrategy execution planning strategy is executed (for all kinds of BaseRelations).

Creating RowDataSourceScanExec Physical Operator for LogicalRelation

pruneFilterProjectRaw(
  relation: LogicalRelation,
  projects: Seq[NamedExpression],
  filterPredicates: Seq[Expression],
  scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]): SparkPlan

pruneFilterProjectRaw converts the given LogicalRelation leaf logical operator into a RowDataSourceScanExec leaf physical operator with the LogicalRelation leaf logical operator (possibly as a child of a FilterExec and a ProjectExec unary physical operators).

Note

pruneFilterProjectRaw is almost like SparkPlanner.pruneFilterProject.

Internally, pruneFilterProjectRaw splits the input filterPredicates expressions to select the Catalyst expressions that can be converted to data source filter predicates (and handled by the underlying BaseRelation of the LogicalRelation).

pruneFilterProjectRaw combines all expressions that are neither convertible to data source filters nor can be handled by the relation using And binary expression (that creates a so-called filterCondition that will eventually be used to create a FilterExec physical operator if non-empty).

pruneFilterProjectRaw creates a RowDataSourceScanExec leaf physical operator.

getPushedDownFilters

getPushedDownFilters(
  partitionColumns: Seq[Expression],
  normalizedFilters: Seq[Expression]): ExpressionSet

For an empty partitionColumns, getPushedDownFilters an empty ExpressionSet.

Otherwise, getPushedDownFilters...FIXME

getPushedDownFilters prints out the following INFO message to the logs:

Pruning directories with: [predicates]

getPushedDownFilters is used when executing the following execution planning strategies:

Demo

import org.apache.spark.sql.execution.datasources.DataSourceStrategy
val strategy = DataSourceStrategy(spark.sessionState.conf)

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
val plan: LogicalPlan = ???

val sparkPlan = strategy(plan).head

Logging

Enable ALL logging level for org.apache.spark.sql.execution.datasources.DataSourceStrategy logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.DataSourceStrategy.name = org.apache.spark.sql.execution.datasources.DataSourceStrategy
logger.DataSourceStrategy.level = all

Refer to Logging.