Skip to content

PartitionPruning Logical Optimization

PartitionPruning is a logical optimization for Dynamic Partition Pruning.

PartitionPruning is a Rule[LogicalPlan] (a Catalyst Rule for logical operators).

PartitionPruning is part of the PartitionPruning batch of the SparkOptimizer.

Executing Rule

Rule
apply(
  plan: LogicalPlan): LogicalPlan

apply is part of the Rule abstraction.

apply is a noop (does nothing and returns the given LogicalPlan) when executed with one of the following:

Otherwise, when enabled, apply prunes the given LogicalPlan.

Pruning

prune(
  plan: LogicalPlan): LogicalPlan

prune transforms up all logical operators in the given logical query plan.

prune skips Join logical operators (leaves unmodified) when either left or right child operators are Filters with DynamicPruningSubquery condition.

prune transforms Join operators with EqualTo join conditions.

FIXME More Work Needed

prune needs more love and would benefit from more insight on how it works.

getFilterableTableScan

getFilterableTableScan(
  a: Expression,
  plan: LogicalPlan): Option[LogicalPlan]

getFilterableTableScan findExpressionAndTrackLineageDown (that finds a LeafNode with the output schema that includes all the Attribute references of the given Expression).

Leaf Nodes

getFilterableTableScan is only interested in the following leaf logical operators:

getFilterableTableScan...FIXME

LogicalRelation over (Partitioned) HadoopFsRelation

For LogicalRelation with (the relation that is) a partitioned HadoopFsRelation, getFilterableTableScan checks if the references (of the given Expression) are all among the partition columns.

If so, getFilterableTableScan returns the LogicalRelation with the partitioned HadoopFsRelation.

hasPartitionPruningFilter

hasPartitionPruningFilter(
  plan: LogicalPlan): Boolean

Note

hasPartitionPruningFilter is hasSelectivePredicate with a streaming check to make sure it disregards streaming queries.

hasPartitionPruningFilter is true when all of the following hold true:

  1. The given LogicalPlan is not streaming
  2. hasSelectivePredicate

hasSelectivePredicate

hasSelectivePredicate(
  plan: LogicalPlan): Boolean

hasSelectivePredicate is true when there is a Filter logical operator with a likely-selective filter condition.

Inserting Predicate with DynamicPruningSubquery Expression

insertPredicate(
  pruningKey: Expression,
  pruningPlan: LogicalPlan,
  filteringKey: Expression,
  filteringPlan: LogicalPlan,
  joinKeys: Seq[Expression],
  partScan: LogicalPlan): LogicalPlan

With spark.sql.exchange.reuse enabled and pruningHasBenefit, insertPredicate creates (inserts into the given pruning plan) a Filter logical operator with a DynamicPruningSubquery expression (with onlyInBroadcast flag based on spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly and pruningHasBenefit).

Otherwise, insertPredicate returns the given pruningPlan logical query plan unchanged.

Configuration Properties

insertPredicate is configured using the following:

pruningHasBenefit

pruningHasBenefit(
  partExpr: Expression,
  partPlan: LogicalPlan,
  otherExpr: Expression,
  otherPlan: LogicalPlan): Boolean

Column Statistics

pruningHasBenefit uses Column Statistics (for the number of distinct values), if available and spark.sql.optimizer.dynamicPartitionPruning.useStats is enabled.

pruningHasBenefit computes a filtering ratio based on the columns (references) in the given partExpr and otherExpr expressions.

One Column Reference Only

pruningHasBenefit supports one column reference only in the given partExpr and otherExpr expressions.

With spark.sql.optimizer.dynamicPartitionPruning.useStats enabled, pruningHasBenefit uses the Distinct Count statistic (CBO stats) for each attribute (in the join condition).

The filtering ratio is the ratio of Distinct Count of rightAttr to Distinct Count of leftAttr (remaining of 1) unless:

  1. Distinct Count are not available or leftAttr's Distinct Count is 0 or negative
  2. Distinct Count of leftAttr is the same or lower than of otherDistinctCount
  3. There are more than one attribute in partExpr or otherExpr expressions

For such cases, the filtering ratio is spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio.

pruningHasBenefit calculates estimatePruningSideSize as the filtering ratio of sizeInBytes statistic of the given partPlan.

pruningHasBenefit is enabled (true) when estimatePruningSideSize is greater than calculatePlanOverhead of the given otherPlan logical plan.