InjectRuntimeFilter Logical Optimization¶
InjectRuntimeFilter is a logical optimization (i.e., a Rule of LogicalPlan).
InjectRuntimeFilter is part of InjectRuntimeFilter fixed-point batch of rules.
Runtime Filter
Runtime Filter can be a BloomFilter (with spark.sql.optimizer.runtime.bloomFilter.enabled enabled) or InSubquery filter.
Noop
InjectRuntimeFilter is a noop (and does nothing) for the following cases:
- The query plan to optimize is a correlated
Subquery(to be rewritten into a join later) - Neither spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled nor spark.sql.optimizer.runtime.bloomFilter.enabled are enabled
Executing Rule¶
apply tryInjectRuntimeFilter.
With runtimeFilterSemiJoinReductionEnabled enabled and the new and the initial logical plans not equal, apply executes RewritePredicateSubquery logical optimization with the new logical plan. Otherwise, apply returns the new logical plan.
tryInjectRuntimeFilter¶
tryInjectRuntimeFilter(
plan: LogicalPlan): LogicalPlan
tryInjectRuntimeFilter transforms the given LogicalPlan with regards to equi-joins.
For every equi-join, tryInjectRuntimeFilter injects a runtime filter (on the left side first and on the right side if on the left was not successful) when all the following requirements are met:
- A join side has no DynamicPruningSubquery filter already
- A join side has no RuntimeFilter
- The left and right keys (pair-wise) are simple expressions
- canPruneLeft or canPruneRight
- filteringHasBenefit
tryInjectRuntimeFilter tries to inject up to spark.sql.optimizer.runtimeFilter.number.threshold filters.
Injecting Filter Operator¶
injectFilter(
filterApplicationSideExp: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideExp: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan
With spark.sql.optimizer.runtime.bloomFilter.enabled, injectFilter injects a filter using BloomFilter.
Otherwise, injectFilter injects a filter using InSubquery.
Injecting BloomFilter¶
injectBloomFilter(
filterApplicationSideExp: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideExp: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan
Note
injectBloomFilter returns the given filterApplicationSidePlan logical plan unchanged when the size of the given filterCreationSidePlan logical plan is above spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold.
injectBloomFilter creates a BloomFilterAggregate expression with a XxHash64 child expression (with the given filterCreationSideExp expression), possibly with the row count statistic of the given filterCreationSidePlan logical plan, if available.
injectBloomFilter creates an Alias expression with the BloomFilterAggregate converted to an AggregateExpression and bloomFilter name.
injectBloomFilter creates an Aggregate logical operator with the following:
- No Grouping Expressions
- Aliased
BloomFilterAggregate - The given
filterCreationSidePlanlogical plan
injectBloomFilter executes the following logical optimization on the Aggregate logical operator:
injectBloomFilter creates a ScalarSubquery expression with the Aggregate logical operator (bloomFilterSubquery).
injectBloomFilter creates a BloomFilterMightContain expression.
| Property | Value |
|---|---|
| bloomFilterExpression | The ScalarSubquery |
| valueExpression | A XxHash64 expression with the given filterApplicationSideExp |
In the end, injectBloomFilter creates a Filter logical operator with the BloomFilterMightContain expression and the given filterApplicationSidePlan logical plan.
injectInSubqueryFilter¶
injectInSubqueryFilter(
filterApplicationSideExp: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideExp: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan
The same DataTypes
injectInSubqueryFilter requires that the DataTypes of the given filterApplicationSideExp and filterCreationSideExp are the same.
injectInSubqueryFilter creates an Aggregate logical operator with the following:
| Property | Value |
|---|---|
| Grouping Expressions | The given filterCreationSideExp expression |
| Aggregate Expressions | An Alias expression for the filterCreationSideExp expression (possibly mayWrapWithHash) |
| Child Logical Operator | The given filterCreationSidePlan expression |
injectInSubqueryFilter executes ColumnPruning logical optimization on the Aggregate logical operator.
Unless the Aggregate logical operator canBroadcastBySize, injectInSubqueryFilter returns the given filterApplicationSidePlan logical plan (and basically throws away all the work so far).
Note
injectInSubqueryFilter skips the InSubquery filter if the size of the Aggregate is beyond broadcast join threshold and the semi-join will be a shuffle join, which is not worthwhile.
injectInSubqueryFilter creates an InSubquery expression with the following:
- The given
filterApplicationSideExp(possibly mayWrapWithHash) - ListQuery expression with the
Aggregate
In the end, injectInSubqueryFilter creates a Filter logical operator with the InSubquery logical operator and the given filterApplicationSidePlan expression.
Note
injectInSubqueryFilter is used when InjectRuntimeFilter is requested to injectFilter with spark.sql.optimizer.runtime.bloomFilter.enabled configuration properties disabled (unlike spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled).
isSimpleExpression¶
isSimpleExpression(
e: Expression): Boolean
isSimpleExpression is an Expression that does not contains any of the following patterns:
PYTHON_UDFSCALA_UDFINVOKEJSON_TO_STRUCTLIKE_FAMLIYREGEXP_EXTRACT_FAMILYREGEXP_REPLACE
hasDynamicPruningSubquery¶
hasDynamicPruningSubquery(
left: LogicalPlan,
right: LogicalPlan,
leftKey: Expression,
rightKey: Expression): Boolean
hasDynamicPruningSubquery checks if there is a Filter logical operator with a DynamicPruningSubquery expression on the left or right side (of a join).
hasRuntimeFilter¶
hasRuntimeFilter(
left: LogicalPlan,
right: LogicalPlan,
leftKey: Expression,
rightKey: Expression): Boolean
hasRuntimeFilter checks if there is hasBloomFilter (with spark.sql.optimizer.runtime.bloomFilter.enabled enabled) or hasInSubquery filter on the left or right side (of a join).
hasBloomFilter¶
hasBloomFilter(
left: LogicalPlan,
right: LogicalPlan,
leftKey: Expression,
rightKey: Expression): Boolean
hasBloomFilter checks if there is findBloomFilterWithExp on the left or right side (of a join).
findBloomFilterWithExp¶
findBloomFilterWithExp(
plan: LogicalPlan,
key: Expression): Boolean
findBloomFilterWithExp tries to find a Filter logical operator with a BloomFilterMightContain expression (and XxHash64) among the nodes of the given LogicalPlan.