CodegenSupport Physical Operators¶
CodegenSupport
is an extension of the SparkPlan abstraction for physical operators that support Whole-Stage Java Code Generation.
Contract¶
Generating Java Source Code for Consume Path¶
doConsume(
ctx: CodegenContext,
input: Seq[ExprCode],
row: ExprCode): String
Generates a Java source code (as a text) for this physical operator for the consume execution path in Whole-Stage Java Code Generation
UnsupportedOperationException
doConsume
throws an UnsupportedOperationException
by default.
Used when:
- This physical operator is requested to generate the Java source code for consume code path (a Java code that consumers the generated columns or a row from a physical operator)
Generating Java Source Code for Produce Path¶
doProduce(
ctx: CodegenContext): String
Generates a Java source code (as a text) for the physical operator to process the rows from the input RDDs for the whole-stage-codegen "produce" path.
Used when:
- This physical operator is requested to generate the Java source code for "produce" code path
Input RDDs¶
inputRDDs(): Seq[RDD[InternalRow]]
Input RDDs of the physical operator
Important
Whole-Stage Java Code Generation supports up to two input RDDs.
Used when:
- WholeStageCodegenExec unary physical operator is executed
Implementations¶
- BroadcastHashJoinExec
- ColumnarToRowExec
- DebugExec
- ExpandExec
- FilterExec
- GenerateExec
- ProjectExec
- RangeExec
- SerializeFromObjectExec
- SortMergeJoinExec
- WholeStageCodegenExec
- others
Final Methods¶
Final methods are used to generate the Java source code in different phases of Whole-Stage Java Code Generation.
Generating Java Source Code for Consume Code Path¶
consume(
ctx: CodegenContext,
outputVars: Seq[ExprCode],
row: String = null): String
consume
generates Java source code for consuming generated columns or a row from the physical operator
consume
creates the ExprCodes
for the input variables (inputVars
).
-
If
outputVars
is defined,consume
makes sure that their number is exactly the length of the output attributes and copies them. In other words,inputVars
is exactlyoutputVars
. -
If
outputVars
is not defined,consume
makes sure thatrow
is defined.consume
sets currentVars of theCodegenContext
tonull
while INPUT_ROW to therow
. For every output attribute,consume
creates a BoundReference and requests it to generate code for expression evaluation.
consume
creates a row variable.
consume
sets the following in the CodegenContext
:
-
currentVars as the
inputVars
-
INPUT_ROW as
null
-
freshNamePrefix as the <
> of the < >.
consume
<output
, inputVars
and <evaluated
.
consume
creates a so-called consumeFunc
by <
. spark.sql.codegen.splitConsumeFuncByOperator internal configuration property is enabled
. <
. paramLength
is correct (FIXME)
Otherwise, consume
requests the <
In the end, consume
gives the plain Java source code with the comment CONSUME: [parent]
:
[evaluated]
[consumeFunc]
Tip
Enable spark.sql.codegen.comments Spark SQL property to have CONSUME
markers in the generated Java source code.
// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
select(explode('value) as "id").
join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
:- Generate explode(value#1), false, false, [id#6]
: +- LocalTableScan [value#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 1, step=1, splits=8)
...
/* 066 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 067 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 068 */ // CONSUME: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 069 */ // input[0, int, false]
/* 070 */ int inputadapter_value = inputadapter_row.getInt(0);
...
/* 079 */ // find matches from HashedRelation
/* 080 */ UnsafeRow bhj_matched = bhj_isNull ? null: (UnsafeRow)bhj_relation.getValue(bhj_value);
/* 081 */ if (bhj_matched != null) {
/* 082 */ {
/* 083 */ bhj_numOutputRows.add(1);
/* 084 */
/* 085 */ // CONSUME: Project [id#6]
/* 086 */ // CONSUME: WholeStageCodegen
/* 087 */ project_rowWriter.write(0, inputadapter_value);
/* 088 */ append(project_result);
/* 089 */
/* 090 */ }
/* 091 */ }
/* 092 */ if (shouldStop()) return;
...
consume
is used when:
-
BroadcastHashJoinExec,
BaseLimitExec
, DeserializeToObjectExec,ExpandExec
, <>, GenerateExec.md#doConsume[GenerateExec], ProjectExec.md#doConsume[ProjectExec], SampleExec
,SerializeFromObjectExec
,MapElementsExec
,DebugExec
physical operators are requested to generate the Java source code for "consume" path in whole-stage code generation -
HashAggregateExec, InputAdapter, RowDataSourceScanExec, RangeExec, SortExec, SortMergeJoinExec physical operators are requested to generate the Java source code for the "produce" path in whole-stage code generation
Data-Producing Loop Condition¶
limitNotReachedCond: String
limitNotReachedCond
is used as a loop condition by ColumnarToRowExec, SortExec, InputRDDCodegen
and HashAggregateExec physical operators (when requested to doProduce).
limitNotReachedCond
requests the parent physical operator for the limit-not-reached checks.
limitNotReachedCond
returns an empty string for no limit-not-reached checks or concatenates them with &&
.
Generating Java Source Code for Produce Code Path¶
produce(
ctx: CodegenContext,
parent: CodegenSupport): String
produce
generates Java source code for whole-stage-codegen "produce" code path.
produce
prepares a physical operator for query execution and then generates a Java source code with the result of doProduce.
produce
annotates the code block with PRODUCE
markers (that are simple descriptions of the physical operators in a structured query).
produce
is used when:
-
(most importantly)
WholeStageCodegenExec
physical operator is requested to generate the Java source code for a subtree -
A physical operator (with
CodegenSupport
) is requested to generate a Java source code for the produce path in whole-stage Java code generation that usually looks as follows:protected override def doProduce(ctx: CodegenContext): String = { child.asInstanceOf[CodegenSupport].produce(ctx, this) }
spark.sql.codegen.comments Property
Enable spark.sql.codegen.comments
Spark SQL property for PRODUCE
markers in the generated Java source code.
// ./bin/spark-shell --conf spark.sql.codegen.comments=true
import org.apache.spark.sql.execution.debug._
val q = Seq((0 to 4).toList).toDF.
select(explode('value) as "id").
join(spark.range(1), "id")
scala> q.debugCodegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
*Range (0, 1, step=1, splits=8)
...
/* 080 */ protected void processNext() throws java.io.IOException {
/* 081 */ // PRODUCE: Range (0, 1, step=1, splits=8)
/* 082 */ // initialize Range
/* 083 */ if (!range_initRange) {
...
== Subtree 2 / 2 ==
*Project [id#6]
+- *BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
:- Generate explode(value#1), false, false, [id#6]
: +- LocalTableScan [value#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 1, step=1, splits=8)
...
/* 062 */ protected void processNext() throws java.io.IOException {
/* 063 */ // PRODUCE: Project [id#6]
/* 064 */ // PRODUCE: BroadcastHashJoin [cast(id#6 as bigint)], [id#9L], Inner, BuildRight
/* 065 */ // PRODUCE: InputAdapter
/* 066 */ while (inputadapter_input.hasNext() && !stopEarly()) {
...
supportCodegen Flag¶
supportCodegen: Boolean
supportCodegen
flag allows physical operators (that support Whole-Stage Java Code Generation) to disable Java code generation temporarily under certain conditions.
supportCodegen
is enabled (true
) by default.
supportCodegen
can be disabled (false
) in the following physical operators:
- AggregateCodegenSupport
- BroadcastNestedLoopJoinExec
- GenerateExec (based on supportCodegen of a Generator expression)
- ShuffledHashJoinExec (for all join types except
FullOuter
unless spark.sql.codegen.join.fullOuterShuffledHashJoin.enabled is enabled) - SortAggregateExec
- SortMergeJoinExec
supportCodegen
flag is used to select between InputAdapter
or WholeStageCodegenExec
physical operators when CollapseCodegenStages physical optimization is executed (and checks whether a physical operator meets the requirements of whole-stage Java code generation or not).
prepareRowVar Internal Method¶
prepareRowVar(
ctx: CodegenContext,
row: String,
colVars: Seq[ExprCode]): ExprCode
prepareRowVar
...FIXME
prepareRowVar
is used when CodegenSupport
is requested to consume (and constructDoConsumeFunction with spark.sql.codegen.splitConsumeFuncByOperator enabled).
constructDoConsumeFunction Internal Method¶
constructDoConsumeFunction(
ctx: CodegenContext,
inputVars: Seq[ExprCode],
row: String): String
constructDoConsumeFunction
...FIXME
constructDoConsumeFunction
is used when CodegenSupport
is requested to consume.
Used Input Attributes¶
usedInputs: AttributeSet
usedInputs
returns the expression references.
Note
Physical operators can mark it as empty to defer evaluation of attribute expressions until they are actually used (in the generated Java source code for consume path).
usedInputs
is used when:
CodegenSupport
is requested to generate a Java source code for consume path
parent Internal Variable Property¶
parent: CodegenSupport
parent
is a physical operator that supports whole-stage Java code generation.
parent
starts empty, (defaults to null
value) and is assigned a physical operator (with CodegenContext
) only when CodegenContext
is requested to generate a Java source code for produce code path. The physical operator is passed in as an input argument for the produce code path.
limitNotReachedChecks¶
limitNotReachedChecks: Seq[String]
limitNotReachedChecks
is a sequence of checks which evaluate to true if the downstream Limit operators have not received enough records and reached the limit.
limitNotReachedChecks
requests the parent physical operator for limitNotReachedChecks
.
limitNotReachedChecks
is used when:
RangeExec
physical operator is requested to doProduceBaseLimitExec
physical operator is requested tolimitNotReachedChecks
CodegenSupport
physical operator is requested to limitNotReachedCond
canCheckLimitNotReached¶
canCheckLimitNotReached: Boolean
canCheckLimitNotReached
is true
when there are no children.
canCheckLimitNotReached
is used when:
CodegenSupport
physical operator is requested to limitNotReachedCond.
Variable Name Prefix¶
variablePrefix: String
variablePrefix
is the prefix of the variable names of this physical operator.
Physical Operator | Prefix |
---|---|
HashAggregateExec | agg |
BroadcastHashJoinExec | bhj |
ShuffledHashJoinExec | shj |
SortMergeJoinExec | smj |
RDDScanExec | rdd |
DataSourceScanExec | scan |
InMemoryTableScanExec | memoryScan |
WholeStageCodegenExec | wholestagecodegen |
others | Lower-case node name |
variablePrefix
is used when:
needCopyResult Flag¶
needCopyResult: Boolean
needCopyResult
controls whether WholeStageCodegenExec
physical operator should copy result when requested for the Java source code for consume path.
needCopyResult
...FIXME
Demo¶
val q = spark.range(1)
import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)
Generated code:
...
// The above is equivalent to the following method chain
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Range (0, 1, step=1, splits=8)
Generated code:
...