WholeStageCodegenExec Physical Operator¶
WholeStageCodegenExec
is a unary physical operator that (alongside InputAdapter) lays the foundation for the Whole-Stage Java Code Generation for a Codegened Execution Pipeline of a structured query.
Creating Instance¶
WholeStageCodegenExec
takes the following to be created:
- Child SparkPlan (a physical subquery tree)
- Codegen Stage Id
WholeStageCodegenExec
is created when:
- CollapseCodegenStages physical optimization is executed (with spark.sql.codegen.wholeStage configuration property enabled)
CodegenSupport¶
WholeStageCodegenExec
is a CodegenSupport and, when executed, triggers code generation for the whole child physical plan subtree (stage) of a structured query.
Performance Metrics¶
duration¶
How long (in ms) the whole-stage codegen pipeline has been running (i.e. the elapsed time since the underlying BufferedRowIterator had been created and the internal rows were all consumed).
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan abstraction.
doExecute
doCodeGen (that gives a CodegenContext and the Java source code).
doExecute
tries to compile the code and, if fails and spark.sql.codegen.fallback is enabled, falls back to requesting the child physical operator to execute (and skipping codegen for the part of query).
In case the compiled code has the maximum bytecode size of a single compiled Java function (generated by whole-stage codegen) above spark.sql.codegen.hugeMethodLimit threshold, doExecute
prints out the following INFO message and requests the child physical operator to execute (and skipping codegen for the part of query):
Found too long generated codes and JIT optimization might not work:
the bytecode size ([maxMethodCodeSize]) is above the limit [hugeMethodLimit],
and the whole-stage codegen was disabled for this plan (id=[codegenStageId]).
To avoid this, you can raise the limit `spark.sql.codegen.hugeMethodLimit`:
[treeString]
doExecute
requests the CodegenContext
for the references.
doExecute
the child physical operator (that is supposed to be a CodegenSupport) for the inputRDDs.
Up to two input RDDs are supported
doExecute
throws an AssertionError
when the number of input RDDs is more than 2:
Up to two input RDDs can be supported
One Input RDD¶
For one input RDD, doExecute
uses RDD.mapPartitionsWithIndex
operation.
For every partition of the input RDD, doExecute
does the following:
- Compiles the code (the compile code is cached the first time so it's almost a noop)
- Requests the compiled code to
generate
(with the references) to produce a BufferedRowIterator - Requests the
BufferedRowIterator
to initialize - Creates an
Iterator[InternalRow]
to track the rows until the last is consumed and the duration metric can be recorded
Two Input RDDs¶
For two input RDDs, doExecute
...FIXME
Demo¶
val q = spark.range(9)
// we need executedPlan with WholeStageCodegenExec physical operator "injected"
val plan = q.queryExecution.executedPlan
assert(plan.isInstanceOf[org.apache.spark.sql.execution.SparkPlan])
// Note the star prefix of Range that marks WholeStageCodegenExec
scala> println(plan.numberedTreeString)
00 *Range (0, 9, step=1, splits=8)
// As a matter of fact, there are two physical operators in play here
// i.e. WholeStageCodegenExec with Range as the child
scala> plan.foreach { op => println(op.getClass.getName) }
org.apache.spark.sql.execution.WholeStageCodegenExec
org.apache.spark.sql.execution.RangeExec
// Let's access the parent WholeStageCodegenExec
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = plan.asInstanceOf[WholeStageCodegenExec]
// Trigger code generation of the entire query plan tree
val (ctx, code) = wsce.doCodeGen
// CodeFormatter can pretty-print the code
import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
println(CodeFormatter.format(code))
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean range_initRange_0;
/* 010 */ private long range_nextIndex_0;
/* 011 */ private TaskContext range_taskContext_0;
/* 012 */ private InputMetrics range_inputMetrics_0;
/* 013 */ private long range_batchEnd_0;
/* 014 */ private long range_numElementsTodo_0;
/* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 016 */
/* 017 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 018 */ this.references = references;
/* 019 */ }
/* 020 */
/* 021 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 022 */ partitionIndex = index;
/* 023 */ this.inputs = inputs;
/* 024 */
...
import org.apache.spark.sql.catalyst.expressions.codegen._
val ctx = new CodegenContext()
val code: Block = CodeBlock(codeParts = Seq("valid_java_code"), blockInputs = Seq.empty)
// code will be evaluated to produce a value (that can be null)
val isNull: ExprValue = FalseLiteral
val value: ExprValue = new LiteralValue(value = "valid_java_code_for_literal_value", javaType = classOf[String])
val exprCode = ExprCode(code, isNull, value)
val consumeCode = wsce.doConsume(ctx, input = Seq.empty, row = exprCode)
println(consumeCode)
valid_java_code
append(valid_java_code_for_literal_value);
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.WholeStageCodegenExec
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.execution.WholeStageCodegenExec=ALL
Refer to Logging.