InjectRuntimeFilter Logical Optimization¶
InjectRuntimeFilter
is a logical optimization (i.e., a Rule of LogicalPlan).
InjectRuntimeFilter
is part of InjectRuntimeFilter fixed-point batch of rules.
Runtime Filter
Runtime Filter can be a BloomFilter (with spark.sql.optimizer.runtime.bloomFilter.enabled enabled) or InSubquery filter.
Noop
InjectRuntimeFilter
is a noop (and does nothing) for the following cases:
- The query plan to optimize is a correlated
Subquery
(to be rewritten into a join later) - Neither spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled nor spark.sql.optimizer.runtime.bloomFilter.enabled are enabled
Executing Rule¶
apply
tryInjectRuntimeFilter.
With runtimeFilterSemiJoinReductionEnabled enabled and the new and the initial logical plans not equal, apply
executes RewritePredicateSubquery logical optimization with the new logical plan. Otherwise, apply
returns the new logical plan.
tryInjectRuntimeFilter¶
tryInjectRuntimeFilter(
plan: LogicalPlan): LogicalPlan
tryInjectRuntimeFilter
transforms the given LogicalPlan with regards to equi-joins.
For every equi-join, tryInjectRuntimeFilter
injects a runtime filter (on the left side first and on the right side if on the left was not successful) when all the following requirements are met:
- A join side has no DynamicPruningSubquery filter already
- A join side has no RuntimeFilter
- The left and right keys (pair-wise) are simple expressions
- canPruneLeft or canPruneRight
- filteringHasBenefit
tryInjectRuntimeFilter
tries to inject up to spark.sql.optimizer.runtimeFilter.number.threshold filters.
Injecting Filter Operator¶
injectFilter(
filterApplicationSideExp: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideExp: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan
With spark.sql.optimizer.runtime.bloomFilter.enabled, injectFilter
injects a filter using BloomFilter.
Otherwise, injectFilter
injects a filter using InSubquery.
Injecting BloomFilter¶
injectBloomFilter(
filterApplicationSideExp: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideExp: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan
Note
injectBloomFilter
returns the given filterApplicationSidePlan
logical plan unchanged when the size of the given filterCreationSidePlan
logical plan is above spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold.
injectBloomFilter
creates a BloomFilterAggregate expression with a XxHash64
child expression (with the given filterCreationSideExp
expression), possibly with the row count statistic of the given filterCreationSidePlan
logical plan, if available.
injectBloomFilter
creates an Alias
expression with the BloomFilterAggregate converted to an AggregateExpression and bloomFilter name.
injectBloomFilter
creates an Aggregate logical operator with the following:
- No Grouping Expressions
- Aliased
BloomFilterAggregate
- The given
filterCreationSidePlan
logical plan
injectBloomFilter
executes the following logical optimization on the Aggregate
logical operator:
injectBloomFilter
creates a ScalarSubquery expression with the Aggregate
logical operator (bloomFilterSubquery
).
injectBloomFilter
creates a BloomFilterMightContain expression.
Property | Value |
---|---|
bloomFilterExpression | The ScalarSubquery |
valueExpression | A XxHash64 expression with the given filterApplicationSideExp |
In the end, injectBloomFilter
creates a Filter
logical operator with the BloomFilterMightContain
expression and the given filterApplicationSidePlan
logical plan.
injectInSubqueryFilter¶
injectInSubqueryFilter(
filterApplicationSideExp: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideExp: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan
The same DataType
s
injectInSubqueryFilter
requires that the DataTypes of the given filterApplicationSideExp
and filterCreationSideExp
are the same.
injectInSubqueryFilter
creates an Aggregate logical operator with the following:
Property | Value |
---|---|
Grouping Expressions | The given filterCreationSideExp expression |
Aggregate Expressions | An Alias expression for the filterCreationSideExp expression (possibly mayWrapWithHash) |
Child Logical Operator | The given filterCreationSidePlan expression |
injectInSubqueryFilter
executes ColumnPruning logical optimization on the Aggregate
logical operator.
Unless the Aggregate
logical operator canBroadcastBySize, injectInSubqueryFilter
returns the given filterApplicationSidePlan
logical plan (and basically throws away all the work so far).
Note
injectInSubqueryFilter
skips the InSubquery filter if the size of the Aggregate
is beyond broadcast join threshold and the semi-join will be a shuffle join, which is not worthwhile.
injectInSubqueryFilter
creates an InSubquery expression with the following:
- The given
filterApplicationSideExp
(possibly mayWrapWithHash) - ListQuery expression with the
Aggregate
In the end, injectInSubqueryFilter
creates a Filter
logical operator with the InSubquery
logical operator and the given filterApplicationSidePlan
expression.
Note
injectInSubqueryFilter
is used when InjectRuntimeFilter
is requested to injectFilter with spark.sql.optimizer.runtime.bloomFilter.enabled configuration properties disabled (unlike spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled).
isSimpleExpression¶
isSimpleExpression(
e: Expression): Boolean
isSimpleExpression
is an Expression that does not contains any of the following patterns:
PYTHON_UDF
SCALA_UDF
INVOKE
JSON_TO_STRUCT
LIKE_FAMLIY
REGEXP_EXTRACT_FAMILY
REGEXP_REPLACE
hasDynamicPruningSubquery¶
hasDynamicPruningSubquery(
left: LogicalPlan,
right: LogicalPlan,
leftKey: Expression,
rightKey: Expression): Boolean
hasDynamicPruningSubquery
checks if there is a Filter
logical operator with a DynamicPruningSubquery expression on the left
or right
side (of a join).
hasRuntimeFilter¶
hasRuntimeFilter(
left: LogicalPlan,
right: LogicalPlan,
leftKey: Expression,
rightKey: Expression): Boolean
hasRuntimeFilter
checks if there is hasBloomFilter (with spark.sql.optimizer.runtime.bloomFilter.enabled enabled) or hasInSubquery filter on the left
or right
side (of a join).
hasBloomFilter¶
hasBloomFilter(
left: LogicalPlan,
right: LogicalPlan,
leftKey: Expression,
rightKey: Expression): Boolean
hasBloomFilter
checks if there is findBloomFilterWithExp on the left
or right
side (of a join).
findBloomFilterWithExp¶
findBloomFilterWithExp(
plan: LogicalPlan,
key: Expression): Boolean
findBloomFilterWithExp
tries to find a Filter
logical operator with a BloomFilterMightContain expression (and XxHash64
) among the nodes of the given LogicalPlan.