PartitionPruning Logical Optimization¶
PartitionPruning
is a logical optimization for Dynamic Partition Pruning.
PartitionPruning
is a Rule[LogicalPlan]
(a Catalyst Rule for logical operators).
PartitionPruning
is part of the PartitionPruning batch of the SparkOptimizer.
Executing Rule¶
apply
is a noop (does nothing and returns the given LogicalPlan) when executed with one of the following:
Subquery
operators that arecorrelated
- spark.sql.optimizer.dynamicPartitionPruning.enabled configuration property is disabled
Otherwise, when enabled, apply
prunes the given LogicalPlan.
Pruning¶
prune(
plan: LogicalPlan): LogicalPlan
prune
transforms up all logical operators in the given logical query plan.
prune
skips Join logical operators (leaves unmodified) when either left or right child operators are Filter
s with DynamicPruningSubquery condition.
prune
transforms Join operators with EqualTo join conditions.
FIXME More Work Needed
prune
needs more love and would benefit from more insight on how it works.
getFilterableTableScan¶
getFilterableTableScan(
a: Expression,
plan: LogicalPlan): Option[LogicalPlan]
getFilterableTableScan
findExpressionAndTrackLineageDown (that finds a LeafNode with the output schema that includes all the Attribute references of the given Expression).
Leaf Nodes
getFilterableTableScan
is only interested in the following leaf logical operators:
- DataSourceV2ScanRelation over
SupportsRuntimeFiltering
scans - HiveTableRelation
- LogicalRelation over HadoopFsRelation
getFilterableTableScan
...FIXME
LogicalRelation over (Partitioned) HadoopFsRelation¶
For LogicalRelation with (the relation that is) a partitioned HadoopFsRelation, getFilterableTableScan
checks if the references (of the given Expression) are all among the partition columns.
If so, getFilterableTableScan
returns the LogicalRelation
with the partitioned HadoopFsRelation
.
hasPartitionPruningFilter¶
hasPartitionPruningFilter(
plan: LogicalPlan): Boolean
Note
hasPartitionPruningFilter
is hasSelectivePredicate with a streaming check to make sure it disregards streaming queries.
hasPartitionPruningFilter
is true
when all of the following hold true:
- The given LogicalPlan is not streaming
- hasSelectivePredicate
hasSelectivePredicate¶
hasSelectivePredicate(
plan: LogicalPlan): Boolean
hasSelectivePredicate
is true
when there is a Filter
logical operator with a likely-selective filter condition.
Inserting Predicate with DynamicPruningSubquery Expression¶
insertPredicate(
pruningKey: Expression,
pruningPlan: LogicalPlan,
filteringKey: Expression,
filteringPlan: LogicalPlan,
joinKeys: Seq[Expression],
partScan: LogicalPlan): LogicalPlan
With spark.sql.exchange.reuse enabled and pruningHasBenefit, insertPredicate
creates (inserts into the given pruning plan) a Filter
logical operator with a DynamicPruningSubquery expression (with onlyInBroadcast flag based on spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly and pruningHasBenefit).
Otherwise, insertPredicate
returns the given pruningPlan
logical query plan unchanged.
Configuration Properties
insertPredicate
is configured using the following:
pruningHasBenefit¶
pruningHasBenefit(
partExpr: Expression,
partPlan: LogicalPlan,
otherExpr: Expression,
otherPlan: LogicalPlan): Boolean
Column Statistics
pruningHasBenefit
uses Column Statistics (for the number of distinct values), if available and spark.sql.optimizer.dynamicPartitionPruning.useStats is enabled.
pruningHasBenefit
computes a filtering ratio based on the columns (references) in the given partExpr
and otherExpr
expressions.
One Column Reference Only
pruningHasBenefit
supports one column reference only in the given partExpr
and otherExpr
expressions.
With spark.sql.optimizer.dynamicPartitionPruning.useStats enabled, pruningHasBenefit
uses the Distinct Count statistic (CBO stats) for each attribute (in the join condition).
The filtering ratio is the ratio of Distinct Count of rightAttr
to Distinct Count of leftAttr
(remaining of 1) unless:
- Distinct Count are not available or
leftAttr
'sDistinct Count
is0
or negative - Distinct Count of
leftAttr
is the same or lower than ofotherDistinctCount
- There are more than one attribute in
partExpr
orotherExpr
expressions
For such cases, the filtering ratio is spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio.
pruningHasBenefit
calculates estimatePruningSideSize
as the filtering ratio of sizeInBytes statistic of the given partPlan
.
pruningHasBenefit
is enabled (true
) when estimatePruningSideSize
is greater than calculatePlanOverhead of the given otherPlan
logical plan.