CodegenSupport Physical Operators¶
CodegenSupport is an extension of the SparkPlan abstraction for physical operators that support Whole-Stage Java Code Generation.
Contract¶
Generating Java Source Code for Consume Path¶
doConsume(
ctx: CodegenContext,
input: Seq[ExprCode],
row: ExprCode): String
Generates a Java source code (as a text) for this physical operator for the consume execution path in Whole-Stage Java Code Generation
UnsupportedOperationException
doConsume throws an UnsupportedOperationException by default.
Used when:
- This physical operator is requested to generate the Java source code for consume code path (a Java code that consumers the generated columns or a row from a physical operator)
Generating Java Source Code for Produce Path¶
doProduce(
ctx: CodegenContext): String
Generates a Java source code (as a text) for the physical operator to process the rows from the input RDDs for the whole-stage-codegen "produce" path.
Used when:
- This physical operator is requested to generate the Java source code for "produce" code path
Input RDDs¶
inputRDDs(): Seq[RDD[InternalRow]]
Input RDDs of the physical operator
Important
Whole-Stage Java Code Generation supports up to two input RDDs.
Used when:
- WholeStageCodegenExec unary physical operator is executed
Implementations¶
- BroadcastHashJoinExec
- ColumnarToRowExec
- DebugExec
- ExpandExec
- FilterExec
- GenerateExec
- ProjectExec
- RangeExec
- SerializeFromObjectExec
- SortMergeJoinExec
- WholeStageCodegenExec
- others
Final Methods¶
Final methods are used to generate the Java source code in different phases of Whole-Stage Java Code Generation.
Generating Java Source Code for Consume Code Path¶
consume(
ctx: CodegenContext,
outputVars: Seq[ExprCode],
row: String = null): String
consume generates Java source code for consuming generated columns or a row from the physical operator
consume creates the ExprCodes for the input variables (inputVars).
-
If
outputVarsis defined,consumemakes sure that their number is exactly the length of the output attributes and copies them. In other words,inputVarsis exactlyoutputVars. -
If
outputVarsis not defined,consumemakes sure thatrowis defined.consumesets currentVars of theCodegenContexttonullwhile INPUT_ROW to therow. For every output attribute,consumecreates a BoundReference and requests it to generate code for expression evaluation.
consume creates a row variable.
consume sets the following in the CodegenContext:
-
currentVars as the
inputVars -
INPUT_ROW as
null -
freshNamePrefix as the <
> of the < >.
consume <output, inputVars and <evaluated.
consume creates a so-called consumeFunc by <
. spark.sql.codegen.splitConsumeFuncByOperator internal configuration property is enabled
. <
. paramLength is correct (FIXME)
Otherwise, consume requests the <
In the end, consume gives the plain Java source code with the comment CONSUME: [parent]:
[evaluated]
[consumeFunc]
Tip
Enable spark.sql.codegen.comments Spark SQL property to have CONSUME markers in the generated Java source code.
// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
select(explode('value) as "id").
join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
:- Generate explode(value#1), false, false, [id#6]
: +- LocalTableScan [value#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 1, step=1, splits=8)
...
/* 066 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 067 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 068 */ // CONSUME: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 069 */ // input[0, int, false]
/* 070 */ int inputadapter_value = inputadapter_row.getInt(0);
...
/* 079 */ // find matches from HashedRelation
/* 080 */ UnsafeRow bhj_matched = bhj_isNull ? null: (UnsafeRow)bhj_relation.getValue(bhj_value);
/* 081 */ if (bhj_matched != null) {
/* 082 */ {
/* 083 */ bhj_numOutputRows.add(1);
/* 084 */
/* 085 */ // CONSUME: Project [id#6]
/* 086 */ // CONSUME: WholeStageCodegen
/* 087 */ project_rowWriter.write(0, inputadapter_value);
/* 088 */ append(project_result);
/* 089 */
/* 090 */ }
/* 091 */ }
/* 092 */ if (shouldStop()) return;
...
consume is used when:
-
BroadcastHashJoinExec,
BaseLimitExec, DeserializeToObjectExec,ExpandExec, <>, GenerateExec.md#doConsume[GenerateExec], ProjectExec.md#doConsume[ProjectExec], SampleExec,SerializeFromObjectExec,MapElementsExec,DebugExecphysical operators are requested to generate the Java source code for "consume" path in whole-stage code generation -
HashAggregateExec, InputAdapter, RowDataSourceScanExec, RangeExec, SortExec, SortMergeJoinExec physical operators are requested to generate the Java source code for the "produce" path in whole-stage code generation
Data-Producing Loop Condition¶
limitNotReachedCond: String
limitNotReachedCond is used as a loop condition by ColumnarToRowExec, SortExec, InputRDDCodegen and HashAggregateExec physical operators (when requested to doProduce).
limitNotReachedCond requests the parent physical operator for the limit-not-reached checks.
limitNotReachedCond returns an empty string for no limit-not-reached checks or concatenates them with &&.
Generating Java Source Code for Produce Code Path¶
produce(
ctx: CodegenContext,
parent: CodegenSupport): String
produce generates Java source code for whole-stage-codegen "produce" code path.
produce prepares a physical operator for query execution and then generates a Java source code with the result of doProduce.
produce annotates the code block with PRODUCE markers (that are simple descriptions of the physical operators in a structured query).
produce is used when:
-
(most importantly)
WholeStageCodegenExecphysical operator is requested to generate the Java source code for a subtree -
A physical operator (with
CodegenSupport) is requested to generate a Java source code for the produce path in whole-stage Java code generation that usually looks as follows:protected override def doProduce(ctx: CodegenContext): String = { child.asInstanceOf[CodegenSupport].produce(ctx, this) }
spark.sql.codegen.comments Property
Enable spark.sql.codegen.comments Spark SQL property for PRODUCE markers in the generated Java source code.
// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
select(explode('value) as "id").
join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
*Range (0, 1, step=1, splits=8)
...
/* 080 */ protected void processNext() throws java.io.IOException {
/* 081 */ // PRODUCE: Range (0, 1, step=1, splits=8)
/* 082 */ // initialize Range
/* 083 */ if (!range_initRange) {
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
:- Generate explode(value#1), false, false, [id#6]
: +- LocalTableScan [value#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 1, step=1, splits=8)
...
/* 062 */ protected void processNext() throws java.io.IOException {
/* 063 */ // PRODUCE: Project [id#6]
/* 064 */ // PRODUCE: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 065 */ // PRODUCE: InputAdapter
/* 066 */ while (inputadapter_input.hasNext() && !stopEarly()) {
...
supportCodegen Flag¶
supportCodegen: Boolean
supportCodegen flag allows physical operators (that support Whole-Stage Java Code Generation) to disable Java code generation temporarily under certain conditions.
supportCodegen is enabled (true) by default.
supportCodegen can be disabled (false) in the following physical operators:
- AggregateCodegenSupport
- BroadcastNestedLoopJoinExec
- GenerateExec (based on supportCodegen of a Generator expression)
- ShuffledHashJoinExec (for all join types except
FullOuterunless spark.sql.codegen.join.fullOuterShuffledHashJoin.enabled is enabled) - SortAggregateExec
- SortMergeJoinExec
supportCodegen flag is used to select between InputAdapter or WholeStageCodegenExec physical operators when CollapseCodegenStages physical optimization is executed (and checks whether a physical operator meets the requirements of whole-stage Java code generation or not).
prepareRowVar Internal Method¶
prepareRowVar(
ctx: CodegenContext,
row: String,
colVars: Seq[ExprCode]): ExprCode
prepareRowVar...FIXME
prepareRowVar is used when CodegenSupport is requested to consume (and constructDoConsumeFunction with spark.sql.codegen.splitConsumeFuncByOperator enabled).
constructDoConsumeFunction Internal Method¶
constructDoConsumeFunction(
ctx: CodegenContext,
inputVars: Seq[ExprCode],
row: String): String
constructDoConsumeFunction...FIXME
constructDoConsumeFunction is used when CodegenSupport is requested to consume.
Used Input Attributes¶
usedInputs: AttributeSet
usedInputs returns the expression references.
Note
Physical operators can mark it as empty to defer evaluation of attribute expressions until they are actually used (in the generated Java source code for consume path).
usedInputs is used when:
CodegenSupportis requested to generate a Java source code for consume path
parent Internal Variable Property¶
parent: CodegenSupport
parent is a physical operator that supports whole-stage Java code generation.
parent starts empty, (defaults to null value) and is assigned a physical operator (with CodegenContext) only when CodegenContext is requested to generate a Java source code for produce code path. The physical operator is passed in as an input argument for the produce code path.
limitNotReachedChecks¶
limitNotReachedChecks: Seq[String]
limitNotReachedChecks is a sequence of checks which evaluate to true if the downstream Limit operators have not received enough records and reached the limit.
limitNotReachedChecks requests the parent physical operator for limitNotReachedChecks.
limitNotReachedChecks is used when:
RangeExecphysical operator is requested to doProduceBaseLimitExecphysical operator is requested tolimitNotReachedChecksCodegenSupportphysical operator is requested to limitNotReachedCond
canCheckLimitNotReached¶
canCheckLimitNotReached: Boolean
canCheckLimitNotReached is true when there are no children.
canCheckLimitNotReached is used when:
CodegenSupportphysical operator is requested to limitNotReachedCond.
Variable Name Prefix¶
variablePrefix: String
variablePrefix is the prefix of the variable names of this physical operator.
| Physical Operator | Prefix |
|---|---|
| HashAggregateExec | agg |
| BroadcastHashJoinExec | bhj |
| ShuffledHashJoinExec | shj |
| SortMergeJoinExec | smj |
| RDDScanExec | rdd |
| DataSourceScanExec | scan |
| InMemoryTableScanExec | memoryScan |
| WholeStageCodegenExec | wholestagecodegen |
| others | Lower-case node name |
variablePrefix is used when:
needCopyResult Flag¶
needCopyResult: Boolean
needCopyResult controls whether WholeStageCodegenExec physical operator should copy result when requested for the Java source code for consume path.
needCopyResult...FIXME
Demo¶
val q = spark.range(1)
import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)
Generated code:
...
// The above is equivalent to the following method chain
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)
Generated code:
...