Skip to content

FilterExec Unary Physical Operator

FilterExec is a unary physical operator that represents Filter and TypedFilter unary logical operators at execution time.

FilterExec supports Java code generation (aka codegen) as follows:

  • <> is an empty AttributeSet (to defer evaluation of attribute expressions until they are actually used, i.e. in the generated Java source code for consume path)

  • Uses whatever the <> physical operator uses for the input RDDs

  • Generates a Java source code for the <> and <> paths in whole-stage code generation

FilterExec is <> when:

[[inputRDDs]] [[outputOrdering]] [[outputPartitioning]] FilterExec uses whatever the <> physical operator uses for the input RDDs, the outputOrdering and the outputPartitioning.

FilterExec uses PredicateHelper.

[[internal-registries]] .FilterExec's Internal Properties (e.g. Registries, Counters and Flags) [cols="1,2",options="header",width="100%"] |=== | Name | Description

| notNullAttributes | [[notNullAttributes]] FIXME

Used when...FIXME

| notNullPreds | [[notNullPreds]] FIXME

Used when...FIXME

| otherPreds | [[otherPreds]] FIXME

Used when...FIXME |===

=== [[creating-instance]] Creating FilterExec Instance

FilterExec takes the following when created:

  • [[condition]] <> for the filter condition
  • [[child]] Child <>

FilterExec initializes the <>.

=== [[isNullIntolerant]] isNullIntolerant Internal Method

[source, scala]

isNullIntolerant(expr: Expression): Boolean

isNullIntolerant...FIXME

NOTE: isNullIntolerant is used when...FIXME

=== [[doConsume]] Generating Java Source Code for Consume Path in Whole-Stage Code Generation -- doConsume Method

[source, scala]

doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String

doConsume creates a new metric term for the <> metric.

doConsume...FIXME

In the end, doConsume uses consume and FIXME to generate a Java source code (as a plain text) inside a do {...} while(false); code block.

doConsume is part of the CodegenSupport abstraction.

==== [[doConsume-genPredicate]] genPredicate Internal Method

[source, scala]

genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String

NOTE: genPredicate is an internal method of <>.

genPredicate...FIXME

=== [[doExecute]] Executing Physical Operator (Generating RDD[InternalRow]) -- doExecute Method

[source, scala]

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.

doExecute executes the <> physical operator and creates a new MapPartitionsRDD that does the filtering.

[source, scala]

// DEMO Show the RDD lineage with the new MapPartitionsRDD after FilterExec

Internally, doExecute takes the <> metric.

In the end, doExecute requests the <> physical operator to <> (that triggers physical query planning and generates an RDD[InternalRow]) and transforms it by executing the following function on internal rows per partition with index (using RDD.mapPartitionsWithIndexInternal that creates another RDD):

. Creates a partition filter as a new <> (for the <> expression and the <> of the <> physical operator)

. Requests the generated partition filter Predicate to initialize (with 0 partition index)

. Filters out elements from the partition iterator (Iterator[InternalRow]) by requesting the generated partition filter Predicate to evaluate for every InternalRow .. Increments the <> metric for positive evaluations (i.e. that returned true)

NOTE: doExecute (by RDD.mapPartitionsWithIndexInternal) adds a new MapPartitionsRDD to the RDD lineage. Use RDD.toDebugString to see the additional MapPartitionsRDD.

Performance Metrics

Key Name (in web UI) Description
numOutputRows number of output rows Number of output rows

FilterExec in web UI (Details for Query)