DataSourceStrategy Execution Planning Strategy¶
DataSourceStrategy is an execution planning strategy (of SparkPlanner) that plans LogicalRelation logical operators as RowDataSourceScanExec physical operators (possibly under FilterExec and ProjectExec logical operators).
Executing Rule¶
apply(
plan: LogicalPlan): Seq[SparkPlan]
apply plans the given LogicalPlan into a corresponding SparkPlan.
| Logical Operator | Description |
|---|---|
LogicalRelation with a CatalystScan relation |
|
| LogicalRelation with PrunedFilteredScan relation |
|
| LogicalRelation with a PrunedScan relation |
|
| LogicalRelation with a TableScan relation |
|
pruneFilterProject¶
pruneFilterProject(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]): SparkPlan
pruneFilterProject pruneFilterProjectRaw (with scanBuilder ignoring the Seq[Expression] input argument).
pruneFilterProject is used when:
DataSourceStrategyexecution planning strategy is executed (with LogicalRelations over a PrunedFilteredScan or a PrunedScan)
Selecting Catalyst Expressions Convertible to Data Source Filter Predicates¶
selectFilters(
relation: BaseRelation,
predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Set[Filter])
selectFilters builds a map of Catalyst predicate expressions (from the input predicates) that can be translated to a data source filter predicate.
selectFilters then requests the input BaseRelation for unhandled filters (out of the convertible ones that selectFilters built the map with).
In the end, selectFilters returns a 3-element tuple with the following:
-
Inconvertible and unhandled Catalyst predicate expressions
-
All converted data source filters
-
Pushed-down data source filters (that the input
BaseRelationcan handle)
selectFilters is used when DataSourceStrategy execution planning strategy is executed (and creates a RowDataSourceScanExec physical operator).
Translating Catalyst Expression into Data Source Filter Predicate¶
translateFilter(
predicate: Expression,
supportNestedPredicatePushdown: Boolean): Option[Filter]
translateFilter translateFilterWithMapping (with the input parameters and an undefined (None) translatedFilterToExpr).
translateFilter is used when:
FileSourceScanExecphysical operator is requested for the pushedDownFiltersDataSourceStrategyexecution planning strategy is requested to selectFiltersFileSourceStrategyexecution planning strategy is executedDataSourceV2Strategyexecution planning strategy is executed- V2Writes logical optimization is requested to optimize a logical query
translateFilterWithMapping¶
translateFilterWithMapping(
predicate: Expression,
translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]],
nestedPredicatePushdownEnabled: Boolean): Option[Filter]
translateFilterWithMapping translates the input Catalyst Expression to a Data Source Filter predicate.
translateFilterWithMapping branches off based on the given predicate expression:
-
For
Ands,translateFilterWithMappingtranslateFilterWithMapping with the left and right expressions and creates aAndfilter -
For
Ors,translateFilterWithMappingtranslateFilterWithMapping with the left and right expressions and creates aOrfilter -
For
Nots,translateFilterWithMappingtranslateFilterWithMapping with the child expression and creates aNotfilter -
For all the other cases,
translateFilterWithMappingtranslateLeafNodeFilter and, if successful, adds the filter and the predicate expression totranslatedFilterToExprcollection
translateFilterWithMapping is used when:
DataSourceStrategyis requested to translateFilterPushDownUtilsis requested to pushFilters
translateLeafNodeFilter¶
translateLeafNodeFilter(
predicate: Expression,
pushableColumn: PushableColumnBase): Option[Filter]
translateLeafNodeFilter translates a given Catalyst Expression into a corresponding Filter predicate if possible. If not, translateFilter returns None.
| Catalyst Expression | Filter Predicate |
|---|---|
EqualTo (with a "pushable" column and a Literal) | EqualTo |
EqualNullSafe (with a "pushable" column and a Literal) | EqualNullSafe |
GreaterThan (with a "pushable" column and a Literal) | GreaterThan or LessThan |
LessThan (with a "pushable" column and a Literal) | LessThan or GreaterThan |
GreaterThanOrEqual (with a "pushable" column and a Literal) | GreaterThanOrEqual or LessThanOrEqual |
LessThanOrEqual (with a "pushable" column and a Literal) | LessThanOrEqual or GreaterThanOrEqual |
| InSet (with a "pushable" column and values) | In |
| InSet (with a "pushable" column and expressions) | In |
IsNull (with a "pushable" column) | IsNull |
IsNotNull (with a "pushable" column) | IsNotNull |
StartsWith (with a "pushable" column and a string Literal) | StringStartsWith |
EndsWith (with a "pushable" column and a string Literal) | StringEndsWith |
Contains (with a "pushable" column and a string Literal) | StringContains |
Literal (with true) | AlwaysTrue |
Literal (with false) | AlwaysFalse |
Note
The Catalyst expressions and their corresponding data source filter predicates have the same names in most cases but belong to different Scala packages (org.apache.spark.sql.catalyst.expressions and org.apache.spark.sql.sources, respectively).
RDD Conversion (Converting RDD of Rows to Catalyst RDD of InternalRows)¶
toCatalystRDD(
relation: LogicalRelation,
output: Seq[Attribute],
rdd: RDD[Row]): RDD[InternalRow]
toCatalystRDD(
relation: LogicalRelation,
rdd: RDD[Row]) // <1>
toCatalystRDD branches off per the needConversion flag of the BaseRelation of the input LogicalRelation:
- otherwise,
toCatalystRDDcasts the inputRDD[Row]to anRDD[InternalRow](using Java'sasInstanceOfoperator)
toCatalystRDD is used when DataSourceStrategy execution planning strategy is executed (for all kinds of BaseRelations).
Creating RowDataSourceScanExec Physical Operator for LogicalRelation¶
pruneFilterProjectRaw(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]): SparkPlan
pruneFilterProjectRaw converts the given LogicalRelation leaf logical operator into a RowDataSourceScanExec leaf physical operator with the LogicalRelation leaf logical operator (possibly as a child of a FilterExec and a ProjectExec unary physical operators).
Note
pruneFilterProjectRaw is almost like SparkPlanner.pruneFilterProject.
Internally, pruneFilterProjectRaw splits the input filterPredicates expressions to select the Catalyst expressions that can be converted to data source filter predicates (and handled by the underlying BaseRelation of the LogicalRelation).
pruneFilterProjectRaw combines all expressions that are neither convertible to data source filters nor can be handled by the relation using And binary expression (that creates a so-called filterCondition that will eventually be used to create a FilterExec physical operator if non-empty).
pruneFilterProjectRaw creates a RowDataSourceScanExec leaf physical operator.
getPushedDownFilters¶
getPushedDownFilters(
partitionColumns: Seq[Expression],
normalizedFilters: Seq[Expression]): ExpressionSet
For an empty partitionColumns, getPushedDownFilters an empty ExpressionSet.
Otherwise, getPushedDownFilters...FIXME
getPushedDownFilters prints out the following INFO message to the logs:
Pruning directories with: [predicates]
getPushedDownFilters is used when executing the following execution planning strategies:
Demo¶
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
val strategy = DataSourceStrategy(spark.sessionState.conf)
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
val plan: LogicalPlan = ???
val sparkPlan = strategy(plan).head
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.datasources.DataSourceStrategy logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.DataSourceStrategy.name = org.apache.spark.sql.execution.datasources.DataSourceStrategy
logger.DataSourceStrategy.level = all
Refer to Logging.