Whole-Stage Java Code Generation¶
Whole-Stage Java Code Generation (Whole-Stage CodeGen) is a physical query optimization in Spark SQL that fuses multiple physical operators (as a subtree of plans that support code generation) together into a single Java function.
Whole-Stage Java Code Generation improves the execution performance of a query by collapsing a query tree into a single optimized function that eliminates virtual function calls and leverages CPU registers for intermediate data.
Note
Whole-Stage Code Generation is used by some modern massively parallel processing (MPP) databases to achieve a better query execution performance.
See Efficiently Compiling Efficient Query Plans for Modern Hardware (PDF).
Columnar Execution¶
The Whole-Stage Code Generation framework is row-based.
The input RDDs of the physical operators of a whole-stage pipeline are all RDD[InternalRow]
s. The output RDD of a whole-stage pipeline is also an RDD[InternalRow]
. However, the input to a whole-stage code gen stage can be columnar (RDD[ColumnarBatch]
).
If a physical operator supports columnar execution, it can't at the same time support whole-stage-codegen.
CodegenSupport Physical Operators¶
Physical operators that support code generation extend CodegenSupport (and keep supportCodegen flag enabled).
ObjectType¶
Whole-Stage Java Code Generation does not support (skips) physical operators that produce a domain object (the DataType of the output expression is ObjectType) as domain objects cannot be written into an UnsafeRow.
AggregateCodegenSupport¶
For aggregation, Whole-Stage Code Generation is supported by AggregateCodegenSupport physical operators (HashAggregateExec and SortAggregateExec) and only when there are no ImperativeAggregates (supportCodegen).
In other words, Whole-Stage Code Generation will only be used for aggreation for DeclarativeAggregate and TypedAggregateExpression
expressions.
Fast Driver-Local Collect/Take Paths¶
The following physical operators cannot be a root of WholeStageCodegen to support the fast driver-local collect/take paths:
- LocalTableScanExec
CommandResultExec
WholeStageCodegenExec Physical Operator¶
WholeStageCodegenExec physical operator
Janino¶
Janino is used to compile a Java source code into a Java class at runtime.
Debugging Query Execution¶
Debugging Query Execution facility allows deep dive into the whole-stage code generation.
val q = spark.range(10).where('id === 4)
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Filter (id#3L = 4)
+- *(1) Range (0, 10, step=1, splits=8)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
...
val q = spark.range(10).where('id === 4)
import org.apache.spark.sql.execution.debug._
scala> q.debugCodegen()
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Filter (id#0L = 4)
+- *(1) Range (0, 10, step=1, splits=8)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
...
CollapseCodegenStages Physical Preparation Rule¶
At query execution planning, CollapseCodegenStages physical preparation rule finds the physical query plans that support codegen and collapses them together as a WholeStageCodegen (possibly with InputAdapter in-between for physical operators with no support for Java code generation).
CollapseCodegenStages
is part of the sequence of physical preparation rules QueryExecution.preparations that will be applied in order to the physical plan before execution.
debugCodegen¶
debugCodegen or QueryExecution.debug.codegen methods allow to access the generated Java source code for a structured query.
As of Spark 3.0.0, debugCodegen
prints Java bytecode statistics of generated classes (and compiled by Janino).
import org.apache.spark.sql.execution.debug._
val q = "SELECT sum(v) FROM VALUES(1) t(v)"
scala> sql(q).debugCodegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:124; maxConstantPoolSize:130(0.20% used); numInnerClasses:0) ==
...
== Subtree 2 / 2 (maxMethodCodeSize:139; maxConstantPoolSize:137(0.21% used); numInnerClasses:0) ==
spark.sql.codegen.wholeStage¶
Whole-Stage Code Generation is controlled by spark.sql.codegen.wholeStage Spark internal property.
Whole-Stage Code Generation is on by default.
assert(spark.sessionState.conf.wholeStageEnabled)
Code Generation Paths¶
Code generation paths were coined in this commit.
Tip
Learn more in SPARK-12795 Whole stage codegen.
Non-Whole-Stage-Codegen Path¶
Produce Path¶
Whole-stage-codegen "produce" path
A physical operator with CodegenSupport can generate Java source code to process the rows from input RDDs.
Consume Path¶
Whole-stage-codegen "consume" path
BenchmarkWholeStageCodegen¶
BenchmarkWholeStageCodegen
class provides a benchmark to measure whole stage codegen performance.
You can execute it using the command:
build/sbt 'sql/testOnly *BenchmarkWholeStageCodegen'
Note
You need to un-ignore tests in BenchmarkWholeStageCodegen
by replacing ignore
with test
.
$ build/sbt 'sql/testOnly *BenchmarkWholeStageCodegen'
...
Running benchmark: range/limit/sum
Running case: range/limit/sum codegen=false
22:55:23.028 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Running case: range/limit/sum codegen=true
Java HotSpot(TM) 64-Bit Server VM 1.8.0_77-b03 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
range/limit/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
range/limit/sum codegen=false 376 / 433 1394.5 0.7 1.0X
range/limit/sum codegen=true 332 / 388 1581.3 0.6 1.1X
[info] - range/limit/sum (10 seconds, 74 milliseconds)