DataSourceStrategy Execution Planning Strategy¶
DataSourceStrategy
is an execution planning strategy (of SparkPlanner) that plans LogicalRelation logical operators as RowDataSourceScanExec physical operators (possibly under FilterExec and ProjectExec logical operators).
Executing Rule¶
apply(
plan: LogicalPlan): Seq[SparkPlan]
apply
plans the given LogicalPlan into a corresponding SparkPlan.
Logical Operator | Description |
---|---|
LogicalRelation with a CatalystScan relation |
|
LogicalRelation with PrunedFilteredScan relation |
|
LogicalRelation with a PrunedScan relation |
|
LogicalRelation with a TableScan relation |
|
pruneFilterProject¶
pruneFilterProject(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]): SparkPlan
pruneFilterProject
pruneFilterProjectRaw (with scanBuilder
ignoring the Seq[Expression]
input argument).
pruneFilterProject
is used when:
DataSourceStrategy
execution planning strategy is executed (with LogicalRelations over a PrunedFilteredScan or a PrunedScan)
Selecting Catalyst Expressions Convertible to Data Source Filter Predicates¶
selectFilters(
relation: BaseRelation,
predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Set[Filter])
selectFilters
builds a map of Catalyst predicate expressions (from the input predicates
) that can be translated to a data source filter predicate.
selectFilters
then requests the input BaseRelation for unhandled filters (out of the convertible ones that selectFilters
built the map with).
In the end, selectFilters
returns a 3-element tuple with the following:
-
Inconvertible and unhandled Catalyst predicate expressions
-
All converted data source filters
-
Pushed-down data source filters (that the input
BaseRelation
can handle)
selectFilters
is used when DataSourceStrategy
execution planning strategy is executed (and creates a RowDataSourceScanExec physical operator).
Translating Catalyst Expression into Data Source Filter Predicate¶
translateFilter(
predicate: Expression,
supportNestedPredicatePushdown: Boolean): Option[Filter]
translateFilter
translateFilterWithMapping (with the input parameters and an undefined (None
) translatedFilterToExpr
).
translateFilter
is used when:
FileSourceScanExec
physical operator is requested for the pushedDownFiltersDataSourceStrategy
execution planning strategy is requested to selectFiltersFileSourceStrategy
execution planning strategy is executedDataSourceV2Strategy
execution planning strategy is executed- V2Writes logical optimization is requested to optimize a logical query
translateFilterWithMapping¶
translateFilterWithMapping(
predicate: Expression,
translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]],
nestedPredicatePushdownEnabled: Boolean): Option[Filter]
translateFilterWithMapping
translates the input Catalyst Expression to a Data Source Filter predicate.
translateFilterWithMapping
branches off based on the given predicate expression:
-
For
And
s,translateFilterWithMapping
translateFilterWithMapping with the left and right expressions and creates aAnd
filter -
For
Or
s,translateFilterWithMapping
translateFilterWithMapping with the left and right expressions and creates aOr
filter -
For
Not
s,translateFilterWithMapping
translateFilterWithMapping with the child expression and creates aNot
filter -
For all the other cases,
translateFilterWithMapping
translateLeafNodeFilter and, if successful, adds the filter and the predicate expression totranslatedFilterToExpr
collection
translateFilterWithMapping
is used when:
DataSourceStrategy
is requested to translateFilterPushDownUtils
is requested to pushFilters
translateLeafNodeFilter¶
translateLeafNodeFilter(
predicate: Expression,
pushableColumn: PushableColumnBase): Option[Filter]
translateLeafNodeFilter
translates a given Catalyst Expression into a corresponding Filter predicate if possible. If not, translateFilter
returns None
.
Catalyst Expression | Filter Predicate |
---|---|
EqualTo (with a "pushable" column and a Literal ) | EqualTo |
EqualNullSafe (with a "pushable" column and a Literal ) | EqualNullSafe |
GreaterThan (with a "pushable" column and a Literal ) | GreaterThan or LessThan |
LessThan (with a "pushable" column and a Literal ) | LessThan or GreaterThan |
GreaterThanOrEqual (with a "pushable" column and a Literal ) | GreaterThanOrEqual or LessThanOrEqual |
LessThanOrEqual (with a "pushable" column and a Literal ) | LessThanOrEqual or GreaterThanOrEqual |
InSet (with a "pushable" column and values) | In |
InSet (with a "pushable" column and expressions) | In |
IsNull (with a "pushable" column) | IsNull |
IsNotNull (with a "pushable" column) | IsNotNull |
StartsWith (with a "pushable" column and a string Literal ) | StringStartsWith |
EndsWith (with a "pushable" column and a string Literal ) | StringEndsWith |
Contains (with a "pushable" column and a string Literal ) | StringContains |
Literal (with true ) | AlwaysTrue |
Literal (with false ) | AlwaysFalse |
Note
The Catalyst expressions and their corresponding data source filter predicates have the same names in most cases but belong to different Scala packages (org.apache.spark.sql.catalyst.expressions
and org.apache.spark.sql.sources
, respectively).
RDD Conversion (Converting RDD of Rows to Catalyst RDD of InternalRows)¶
toCatalystRDD(
relation: LogicalRelation,
output: Seq[Attribute],
rdd: RDD[Row]): RDD[InternalRow]
toCatalystRDD(
relation: LogicalRelation,
rdd: RDD[Row]) // <1>
toCatalystRDD
branches off per the needConversion flag of the BaseRelation of the input LogicalRelation:
- otherwise,
toCatalystRDD
casts the inputRDD[Row]
to anRDD[InternalRow]
(using Java'sasInstanceOf
operator)
toCatalystRDD
is used when DataSourceStrategy
execution planning strategy is executed (for all kinds of BaseRelations).
Creating RowDataSourceScanExec Physical Operator for LogicalRelation¶
pruneFilterProjectRaw(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]): SparkPlan
pruneFilterProjectRaw
converts the given LogicalRelation leaf logical operator into a RowDataSourceScanExec leaf physical operator with the LogicalRelation leaf logical operator (possibly as a child of a FilterExec and a ProjectExec unary physical operators).
Note
pruneFilterProjectRaw
is almost like SparkPlanner.pruneFilterProject.
Internally, pruneFilterProjectRaw
splits the input filterPredicates
expressions to select the Catalyst expressions that can be converted to data source filter predicates (and handled by the underlying BaseRelation of the LogicalRelation
).
pruneFilterProjectRaw
combines all expressions that are neither convertible to data source filters nor can be handled by the relation using And
binary expression (that creates a so-called filterCondition
that will eventually be used to create a FilterExec physical operator if non-empty).
pruneFilterProjectRaw
creates a RowDataSourceScanExec leaf physical operator.
getPushedDownFilters¶
getPushedDownFilters(
partitionColumns: Seq[Expression],
normalizedFilters: Seq[Expression]): ExpressionSet
For an empty partitionColumns
, getPushedDownFilters
an empty ExpressionSet
.
Otherwise, getPushedDownFilters
...FIXME
getPushedDownFilters
prints out the following INFO message to the logs:
Pruning directories with: [predicates]
getPushedDownFilters
is used when executing the following execution planning strategies:
Demo¶
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
val strategy = DataSourceStrategy(spark.sessionState.conf)
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
val plan: LogicalPlan = ???
val sparkPlan = strategy(plan).head
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.datasources.DataSourceStrategy
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.DataSourceStrategy.name = org.apache.spark.sql.execution.datasources.DataSourceStrategy
logger.DataSourceStrategy.level = all
Refer to Logging.