Skip to content

WholeStageCodegenExec Physical Operator

WholeStageCodegenExec is a unary physical operator that (alongside InputAdapter) lays the foundation for the Whole-Stage Java Code Generation for a Codegened Execution Pipeline of a structured query.

Creating Instance

WholeStageCodegenExec takes the following to be created:

  • Child SparkPlan (a physical subquery tree)
  • Codegen Stage Id

WholeStageCodegenExec is created when:

CodegenSupport

WholeStageCodegenExec is a CodegenSupport and, when executed, triggers code generation for the whole child physical plan subtree (stage) of a structured query.

Performance Metrics

duration

How long (in ms) the whole-stage codegen pipeline has been running (i.e. the elapsed time since the underlying BufferedRowIterator had been created and the internal rows were all consumed).

WholeStageCodegenExec in web UI (Details for Query)

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.


doExecute doCodeGen (that gives a CodegenContext and the Java source code).

doExecute tries to compile the code and, if fails and spark.sql.codegen.fallback is enabled, falls back to requesting the child physical operator to execute (and skipping codegen for the part of query).

In case the compiled code has the maximum bytecode size of a single compiled Java function (generated by whole-stage codegen) above spark.sql.codegen.hugeMethodLimit threshold, doExecute prints out the following INFO message and requests the child physical operator to execute (and skipping codegen for the part of query):

Found too long generated codes and JIT optimization might not work:
the bytecode size ([maxMethodCodeSize]) is above the limit [hugeMethodLimit],
and the whole-stage codegen was disabled for this plan (id=[codegenStageId]).
To avoid this, you can raise the limit `spark.sql.codegen.hugeMethodLimit`:
[treeString]

doExecute requests the CodegenContext for the references.

doExecute the child physical operator (that is supposed to be a CodegenSupport) for the inputRDDs.

Up to two input RDDs are supported

doExecute throws an AssertionError when the number of input RDDs is more than 2:

Up to two input RDDs can be supported

One Input RDD

For one input RDD, doExecute uses RDD.mapPartitionsWithIndex operation.

For every partition of the input RDD, doExecute does the following:

  1. Compiles the code (the compile code is cached the first time so it's almost a noop)
  2. Requests the compiled code to generate (with the references) to produce a BufferedRowIterator
  3. Requests the BufferedRowIterator to initialize
  4. Creates an Iterator[InternalRow] to track the rows until the last is consumed and the duration metric can be recorded

Two Input RDDs

For two input RDDs, doExecute...FIXME

Demo

val q = spark.range(9)

// we need executedPlan with WholeStageCodegenExec physical operator "injected"
val plan = q.queryExecution.executedPlan
assert(plan.isInstanceOf[org.apache.spark.sql.execution.SparkPlan])
// Note the star prefix of Range that marks WholeStageCodegenExec
scala> println(plan.numberedTreeString)
00 *Range (0, 9, step=1, splits=8)
// As a matter of fact, there are two physical operators in play here
// i.e. WholeStageCodegenExec with Range as the child
scala> plan.foreach { op => println(op.getClass.getName) }
org.apache.spark.sql.execution.WholeStageCodegenExec
org.apache.spark.sql.execution.RangeExec
// Let's access the parent WholeStageCodegenExec
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = plan.asInstanceOf[WholeStageCodegenExec]
// Trigger code generation of the entire query plan tree
val (ctx, code) = wsce.doCodeGen
// CodeFormatter can pretty-print the code
import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
println(CodeFormatter.format(code))
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean range_initRange_0;
/* 010 */   private long range_nextIndex_0;
/* 011 */   private TaskContext range_taskContext_0;
/* 012 */   private InputMetrics range_inputMetrics_0;
/* 013 */   private long range_batchEnd_0;
/* 014 */   private long range_numElementsTodo_0;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 016 */
/* 017 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 018 */     this.references = references;
/* 019 */   }
/* 020 */
/* 021 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 022 */     partitionIndex = index;
/* 023 */     this.inputs = inputs;
/* 024 */
...
import org.apache.spark.sql.catalyst.expressions.codegen._

val ctx = new CodegenContext()

val code: Block = CodeBlock(codeParts = Seq("valid_java_code"), blockInputs = Seq.empty)

// code will be evaluated to produce a value (that can be null)
val isNull: ExprValue = FalseLiteral
val value: ExprValue = new LiteralValue(value = "valid_java_code_for_literal_value", javaType = classOf[String])
val exprCode = ExprCode(code, isNull, value)

val consumeCode = wsce.doConsume(ctx, input = Seq.empty, row = exprCode)
println(consumeCode)
valid_java_code
append(valid_java_code_for_literal_value);

Logging

Enable ALL logging level for org.apache.spark.sql.execution.WholeStageCodegenExec logger to see what happens inside.

Add the following line to conf/log4j2.properties:

log4j.logger.org.apache.spark.sql.execution.WholeStageCodegenExec=ALL

Refer to Logging.