Skip to content

InjectRuntimeFilter Logical Optimization

InjectRuntimeFilter is a logical optimization (i.e., a Rule of LogicalPlan).

InjectRuntimeFilter is part of InjectRuntimeFilter fixed-point batch of rules.

Runtime Filter

Runtime Filter can be a BloomFilter (with spark.sql.optimizer.runtime.bloomFilter.enabled enabled) or InSubquery filter.

Noop

InjectRuntimeFilter is a noop (and does nothing) for the following cases:

  1. The query plan to optimize is a correlated Subquery (to be rewritten into a join later)
  2. Neither spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled nor spark.sql.optimizer.runtime.bloomFilter.enabled are enabled

Executing Rule

Rule
apply(
  plan: LogicalPlan): LogicalPlan

apply is part of the Rule abstraction.

apply tryInjectRuntimeFilter.

With runtimeFilterSemiJoinReductionEnabled enabled and the new and the initial logical plans not equal, apply executes RewritePredicateSubquery logical optimization with the new logical plan. Otherwise, apply returns the new logical plan.

tryInjectRuntimeFilter

tryInjectRuntimeFilter(
  plan: LogicalPlan): LogicalPlan

tryInjectRuntimeFilter transforms the given LogicalPlan with regards to equi-joins.

For every equi-join, tryInjectRuntimeFilter injects a runtime filter (on the left side first and on the right side if on the left was not successful) when all the following requirements are met:

  1. A join side has no DynamicPruningSubquery filter already
  2. A join side has no RuntimeFilter
  3. The left and right keys (pair-wise) are simple expressions
  4. canPruneLeft or canPruneRight
  5. filteringHasBenefit

tryInjectRuntimeFilter tries to inject up to spark.sql.optimizer.runtimeFilter.number.threshold filters.

Injecting Filter Operator

injectFilter(
  filterApplicationSideExp: Expression,
  filterApplicationSidePlan: LogicalPlan,
  filterCreationSideExp: Expression,
  filterCreationSidePlan: LogicalPlan): LogicalPlan

With spark.sql.optimizer.runtime.bloomFilter.enabled, injectFilter injects a filter using BloomFilter.

Otherwise, injectFilter injects a filter using InSubquery.

Injecting BloomFilter

injectBloomFilter(
  filterApplicationSideExp: Expression,
  filterApplicationSidePlan: LogicalPlan,
  filterCreationSideExp: Expression,
  filterCreationSidePlan: LogicalPlan): LogicalPlan

Note

injectBloomFilter returns the given filterApplicationSidePlan logical plan unchanged when the size of the given filterCreationSidePlan logical plan is above spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold.

injectBloomFilter creates a BloomFilterAggregate expression with a XxHash64 child expression (with the given filterCreationSideExp expression), possibly with the row count statistic of the given filterCreationSidePlan logical plan, if available.

injectBloomFilter creates an Alias expression with the BloomFilterAggregate converted to an AggregateExpression and bloomFilter name.

injectBloomFilter creates an Aggregate logical operator with the following:

  • No Grouping Expressions
  • Aliased BloomFilterAggregate
  • The given filterCreationSidePlan logical plan

injectBloomFilter executes the following logical optimization on the Aggregate logical operator:

  1. ColumnPruning
  2. ConstantFolding

injectBloomFilter creates a ScalarSubquery expression with the Aggregate logical operator (bloomFilterSubquery).

injectBloomFilter creates a BloomFilterMightContain expression.

Property Value
bloomFilterExpression The ScalarSubquery
valueExpression A XxHash64 expression with the given filterApplicationSideExp

In the end, injectBloomFilter creates a Filter logical operator with the BloomFilterMightContain expression and the given filterApplicationSidePlan logical plan.

injectInSubqueryFilter

injectInSubqueryFilter(
  filterApplicationSideExp: Expression,
  filterApplicationSidePlan: LogicalPlan,
  filterCreationSideExp: Expression,
  filterCreationSidePlan: LogicalPlan): LogicalPlan

The same DataTypes

injectInSubqueryFilter requires that the DataTypes of the given filterApplicationSideExp and filterCreationSideExp are the same.

injectInSubqueryFilter creates an Aggregate logical operator with the following:

Property Value
Grouping Expressions The given filterCreationSideExp expression
Aggregate Expressions An Alias expression for the filterCreationSideExp expression (possibly mayWrapWithHash)
Child Logical Operator The given filterCreationSidePlan expression

injectInSubqueryFilter executes ColumnPruning logical optimization on the Aggregate logical operator.

Unless the Aggregate logical operator canBroadcastBySize, injectInSubqueryFilter returns the given filterApplicationSidePlan logical plan (and basically throws away all the work so far).

Note

injectInSubqueryFilter skips the InSubquery filter if the size of the Aggregate is beyond broadcast join threshold and the semi-join will be a shuffle join, which is not worthwhile.

injectInSubqueryFilter creates an InSubquery expression with the following:

In the end, injectInSubqueryFilter creates a Filter logical operator with the InSubquery logical operator and the given filterApplicationSidePlan expression.

Note

injectInSubqueryFilter is used when InjectRuntimeFilter is requested to injectFilter with spark.sql.optimizer.runtime.bloomFilter.enabled configuration properties disabled (unlike spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled).

isSimpleExpression

isSimpleExpression(
  e: Expression): Boolean

isSimpleExpression is an Expression that does not contains any of the following patterns:

  • PYTHON_UDF
  • SCALA_UDF
  • INVOKE
  • JSON_TO_STRUCT
  • LIKE_FAMLIY
  • REGEXP_EXTRACT_FAMILY
  • REGEXP_REPLACE

hasDynamicPruningSubquery

hasDynamicPruningSubquery(
  left: LogicalPlan,
  right: LogicalPlan,
  leftKey: Expression,
  rightKey: Expression): Boolean

hasDynamicPruningSubquery checks if there is a Filter logical operator with a DynamicPruningSubquery expression on the left or right side (of a join).

hasRuntimeFilter

hasRuntimeFilter(
  left: LogicalPlan,
  right: LogicalPlan,
  leftKey: Expression,
  rightKey: Expression): Boolean

hasRuntimeFilter checks if there is hasBloomFilter (with spark.sql.optimizer.runtime.bloomFilter.enabled enabled) or hasInSubquery filter on the left or right side (of a join).

hasBloomFilter

hasBloomFilter(
  left: LogicalPlan,
  right: LogicalPlan,
  leftKey: Expression,
  rightKey: Expression): Boolean

hasBloomFilter checks if there is findBloomFilterWithExp on the left or right side (of a join).

findBloomFilterWithExp

findBloomFilterWithExp(
  plan: LogicalPlan,
  key: Expression): Boolean

findBloomFilterWithExp tries to find a Filter logical operator with a BloomFilterMightContain expression (and XxHash64) among the nodes of the given LogicalPlan.