PartitionPruning Logical Optimization¶
PartitionPruning is a logical optimization for Dynamic Partition Pruning.
PartitionPruning is a Rule[LogicalPlan] (a Catalyst Rule for logical operators).
PartitionPruning is part of the PartitionPruning batch of the SparkOptimizer.
Executing Rule¶
apply is a noop (does nothing and returns the given LogicalPlan) when executed with one of the following:
Subqueryoperators that arecorrelated- spark.sql.optimizer.dynamicPartitionPruning.enabled configuration property is disabled
Otherwise, when enabled, apply prunes the given LogicalPlan.
Pruning¶
prune(
plan: LogicalPlan): LogicalPlan
prune transforms up all logical operators in the given logical query plan.
prune skips Join logical operators (leaves unmodified) when either left or right child operators are Filters with DynamicPruningSubquery condition.
prune transforms Join operators with EqualTo join conditions.
FIXME More Work Needed
prune needs more love and would benefit from more insight on how it works.
getFilterableTableScan¶
getFilterableTableScan(
a: Expression,
plan: LogicalPlan): Option[LogicalPlan]
getFilterableTableScan findExpressionAndTrackLineageDown (that finds a LeafNode with the output schema that includes all the Attribute references of the given Expression).
Leaf Nodes
getFilterableTableScan is only interested in the following leaf logical operators:
- DataSourceV2ScanRelation over
SupportsRuntimeFilteringscans - HiveTableRelation
- LogicalRelation over HadoopFsRelation
getFilterableTableScan...FIXME
LogicalRelation over (Partitioned) HadoopFsRelation¶
For LogicalRelation with (the relation that is) a partitioned HadoopFsRelation, getFilterableTableScan checks if the references (of the given Expression) are all among the partition columns.
If so, getFilterableTableScan returns the LogicalRelation with the partitioned HadoopFsRelation.
hasPartitionPruningFilter¶
hasPartitionPruningFilter(
plan: LogicalPlan): Boolean
Note
hasPartitionPruningFilter is hasSelectivePredicate with a streaming check to make sure it disregards streaming queries.
hasPartitionPruningFilter is true when all of the following hold true:
- The given LogicalPlan is not streaming
- hasSelectivePredicate
hasSelectivePredicate¶
hasSelectivePredicate(
plan: LogicalPlan): Boolean
hasSelectivePredicate is true when there is a Filter logical operator with a likely-selective filter condition.
Inserting Predicate with DynamicPruningSubquery Expression¶
insertPredicate(
pruningKey: Expression,
pruningPlan: LogicalPlan,
filteringKey: Expression,
filteringPlan: LogicalPlan,
joinKeys: Seq[Expression],
partScan: LogicalPlan): LogicalPlan
With spark.sql.exchange.reuse enabled and pruningHasBenefit, insertPredicate creates (inserts into the given pruning plan) a Filter logical operator with a DynamicPruningSubquery expression (with onlyInBroadcast flag based on spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly and pruningHasBenefit).
Otherwise, insertPredicate returns the given pruningPlan logical query plan unchanged.
Configuration Properties
insertPredicate is configured using the following:
pruningHasBenefit¶
pruningHasBenefit(
partExpr: Expression,
partPlan: LogicalPlan,
otherExpr: Expression,
otherPlan: LogicalPlan): Boolean
Column Statistics
pruningHasBenefit uses Column Statistics (for the number of distinct values), if available and spark.sql.optimizer.dynamicPartitionPruning.useStats is enabled.
pruningHasBenefit computes a filtering ratio based on the columns (references) in the given partExpr and otherExpr expressions.
One Column Reference Only
pruningHasBenefit supports one column reference only in the given partExpr and otherExpr expressions.
With spark.sql.optimizer.dynamicPartitionPruning.useStats enabled, pruningHasBenefit uses the Distinct Count statistic (CBO stats) for each attribute (in the join condition).
The filtering ratio is the ratio of Distinct Count of rightAttr to Distinct Count of leftAttr (remaining of 1) unless:
- Distinct Count are not available or
leftAttr'sDistinct Countis0or negative - Distinct Count of
leftAttris the same or lower than ofotherDistinctCount - There are more than one attribute in
partExprorotherExprexpressions
For such cases, the filtering ratio is spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio.
pruningHasBenefit calculates estimatePruningSideSize as the filtering ratio of sizeInBytes statistic of the given partPlan.
pruningHasBenefit is enabled (true) when estimatePruningSideSize is greater than calculatePlanOverhead of the given otherPlan logical plan.