Skip to content

CodegenSupport Physical Operators

CodegenSupport is an extension of the SparkPlan abstraction for physical operators that support Whole-Stage Java Code Generation.

Contract

Generating Java Source Code for Consume Path

doConsume(
  ctx: CodegenContext,
  input: Seq[ExprCode],
  row: ExprCode): String

Generates a Java source code (as a text) for this physical operator for the consume execution path in Whole-Stage Java Code Generation

UnsupportedOperationException

doConsume throws an UnsupportedOperationException by default.

Used when:

Generating Java Source Code for Produce Path

doProduce(
  ctx: CodegenContext): String

Generates a Java source code (as a text) for the physical operator to process the rows from the input RDDs for the whole-stage-codegen "produce" path.

Used when:

Input RDDs

inputRDDs(): Seq[RDD[InternalRow]]

Input RDDs of the physical operator

Important

Whole-Stage Java Code Generation supports up to two input RDDs.

Used when:

Implementations

Final Methods

Final methods are used to generate the Java source code in different phases of Whole-Stage Java Code Generation.

Generating Java Source Code for Consume Code Path

consume(
  ctx: CodegenContext,
  outputVars: Seq[ExprCode],
  row: String = null): String

consume generates Java source code for consuming generated columns or a row from the physical operator

consume creates the ExprCodes for the input variables (inputVars).

consume creates a row variable.

consume sets the following in the CodegenContext:

consume <> (with the output, inputVars and <> of the <>) and creates so-called evaluated.

consume creates a so-called consumeFunc by <> when the following are all met:

. spark.sql.codegen.splitConsumeFuncByOperator internal configuration property is enabled

. <> of the <> contains all catalyst/QueryPlan.md#output[output attributes]

. paramLength is correct (FIXME)

Otherwise, consume requests the <> to <>.

In the end, consume gives the plain Java source code with the comment CONSUME: [parent]:

[evaluated]
[consumeFunc]

Tip

Enable spark.sql.codegen.comments Spark SQL property to have CONSUME markers in the generated Java source code.

// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
  select(explode('value) as "id").
  join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
   :- Generate explode(value#1), false, false, [id#6]
   :  +- LocalTableScan [value#1]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 1, step=1, splits=8)
...
/* 066 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 067 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 068 */       // CONSUME: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 069 */       // input[0, int, false]
/* 070 */       int inputadapter_value = inputadapter_row.getInt(0);
...
/* 079 */       // find matches from HashedRelation
/* 080 */       UnsafeRow bhj_matched = bhj_isNull ? null: (UnsafeRow)bhj_relation.getValue(bhj_value);
/* 081 */       if (bhj_matched != null) {
/* 082 */         {
/* 083 */           bhj_numOutputRows.add(1);
/* 084 */
/* 085 */           // CONSUME: Project [id#6]
/* 086 */           // CONSUME: WholeStageCodegen
/* 087 */           project_rowWriter.write(0, inputadapter_value);
/* 088 */           append(project_result);
/* 089 */
/* 090 */         }
/* 091 */       }
/* 092 */       if (shouldStop()) return;
...

consume is used when:

Data-Producing Loop Condition

limitNotReachedCond: String

limitNotReachedCond is used as a loop condition by ColumnarToRowExec, SortExec, InputRDDCodegen and HashAggregateExec physical operators (when requested to doProduce).

limitNotReachedCond requests the parent physical operator for the limit-not-reached checks.

limitNotReachedCond returns an empty string for no limit-not-reached checks or concatenates them with &&.

Generating Java Source Code for Produce Code Path

produce(
  ctx: CodegenContext,
  parent: CodegenSupport): String

produce generates Java source code for whole-stage-codegen "produce" code path.

produce prepares a physical operator for query execution and then generates a Java source code with the result of doProduce.

produce annotates the code block with PRODUCE markers (that are simple descriptions of the physical operators in a structured query).

produce is used when:

spark.sql.codegen.comments Property

Enable spark.sql.codegen.comments Spark SQL property for PRODUCE markers in the generated Java source code.

// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
  select(explode('value) as "id").
  join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
*Range (0, 1, step=1, splits=8)
...
/* 080 */   protected void processNext() throws java.io.IOException {
/* 081 */     // PRODUCE: Range (0, 1, step=1, splits=8)
/* 082 */     // initialize Range
/* 083 */     if (!range_initRange) {
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
   :- Generate explode(value#1), false, false, [id#6]
   :  +- LocalTableScan [value#1]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 1, step=1, splits=8)
...
/* 062 */   protected void processNext() throws java.io.IOException {
/* 063 */     // PRODUCE: Project [id#6]
/* 064 */     // PRODUCE: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 065 */     // PRODUCE: InputAdapter
/* 066 */     while (inputadapter_input.hasNext() && !stopEarly()) {
...

supportCodegen Flag

supportCodegen: Boolean

supportCodegen flag allows physical operators (that support Whole-Stage Java Code Generation) to disable Java code generation temporarily under certain conditions.

supportCodegen is enabled (true) by default.

supportCodegen can be disabled (false) in the following physical operators:

supportCodegen flag is used to select between InputAdapter or WholeStageCodegenExec physical operators when CollapseCodegenStages physical optimization is executed (and checks whether a physical operator meets the requirements of whole-stage Java code generation or not).

prepareRowVar Internal Method

prepareRowVar(
  ctx: CodegenContext,
  row: String,
  colVars: Seq[ExprCode]): ExprCode

prepareRowVar...FIXME

prepareRowVar is used when CodegenSupport is requested to consume (and constructDoConsumeFunction with spark.sql.codegen.splitConsumeFuncByOperator enabled).

constructDoConsumeFunction Internal Method

constructDoConsumeFunction(
  ctx: CodegenContext,
  inputVars: Seq[ExprCode],
  row: String): String

constructDoConsumeFunction...FIXME

constructDoConsumeFunction is used when CodegenSupport is requested to consume.

Used Input Attributes

usedInputs: AttributeSet

usedInputs returns the expression references.

Note

Physical operators can mark it as empty to defer evaluation of attribute expressions until they are actually used (in the generated Java source code for consume path).


usedInputs is used when:

parent Internal Variable Property

parent: CodegenSupport

parent is a physical operator that supports whole-stage Java code generation.

parent starts empty, (defaults to null value) and is assigned a physical operator (with CodegenContext) only when CodegenContext is requested to generate a Java source code for produce code path. The physical operator is passed in as an input argument for the produce code path.

limitNotReachedChecks

limitNotReachedChecks: Seq[String]

limitNotReachedChecks is a sequence of checks which evaluate to true if the downstream Limit operators have not received enough records and reached the limit.

limitNotReachedChecks requests the parent physical operator for limitNotReachedChecks.


limitNotReachedChecks is used when:

  • RangeExec physical operator is requested to doProduce
  • BaseLimitExec physical operator is requested to limitNotReachedChecks
  • CodegenSupport physical operator is requested to limitNotReachedCond

canCheckLimitNotReached

canCheckLimitNotReached: Boolean

canCheckLimitNotReached is true when there are no children.


canCheckLimitNotReached is used when:

Variable Name Prefix

variablePrefix: String

variablePrefix is the prefix of the variable names of this physical operator.

Physical Operator Prefix
HashAggregateExec agg
BroadcastHashJoinExec bhj
ShuffledHashJoinExec shj
SortMergeJoinExec smj
RDDScanExec rdd
DataSourceScanExec scan
InMemoryTableScanExec memoryScan
WholeStageCodegenExec wholestagecodegen
others Lower-case node name

variablePrefix is used when:

  • CodegenSupport is requested to generate the Java source code for produce and consume code paths

needCopyResult Flag

needCopyResult: Boolean

needCopyResult controls whether WholeStageCodegenExec physical operator should copy result when requested for the Java source code for consume path.

needCopyResult...FIXME

Demo

val q = spark.range(1)

import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)

Generated code:
...

// The above is equivalent to the following method chain
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)

Generated code:
...