GenerateExec Unary Physical Operator¶
GenerateExec
is a unary physical operator to manage execution of a Generator expression.
GenerateExec
represents Generate unary logical operator at execution time.
When executed, GenerateExec
executes (aka evaluates) the Generator expression on every row in a RDD partition.
GenerateExec
supports Java code generation (aka codegen) only when this Generator does.
Creating Instance¶
GenerateExec
takes the following to be created:
- Generator Expression
- Required child attributes
-
outer
Flag - Generator Output Attributes
- Child Physical Operator
GenerateExec
is created when:
- BasicOperators execution planning strategy is executed (to plan Generate logical operator)
Required Child Attributes¶
GenerateExec
is given the required child attributes when created.
The required child attributes are by default the requiredChildOutput of this Generator expression.
Performance Metrics¶
Key | Name (in web UI) | Description |
---|---|---|
numOutputRows | number of output rows | Number of output rows |
Executing Operator¶
doExecute
requests the child physical operator to execute (that creates a RDD[InternalRow]
).
doExecute
uses RDD.mapPartitionsWithIndexInternal
operator to process each partition of the RDD.
For every partition, doExecute
requests the boundGenerator to initialize (with the partition index) if Nondeterministic.
With some extra voodoo, doExecute
requests the boundGenerator to eval on every row in the current partition.
After all rows in a partition have been processed, doExecute
requests the boundGenerator to terminate.
In the end, doExecute
converts the rows to UnsafeRows and increments the numOutputRows metric.
supportCodegen¶
supportCodegen
is the supportCodegen of this Generator.
Demo¶
val nums = Seq((0 to 4).toArray).toDF("nums")
val q = nums.withColumn("explode", explode($"nums"))
scala> q.explain
== Physical Plan ==
*(1) Generate explode(nums#4), [nums#4], false, [explode#7]
+- *(1) LocalTableScan [nums#4]
val sparkPlan = q.queryExecution.executedPlan
import org.apache.spark.sql.execution.GenerateExec
val ge = sparkPlan.collect { case ge: GenerateExec => ge }.head
scala> :type ge
org.apache.spark.sql.execution.GenerateExec
val rdd = ge.execute
scala> rdd.toDebugString
res1: String =
(1) MapPartitionsRDD[2] at execute at <console>:1 []
| MapPartitionsRDD[1] at execute at <console>:1 []
| ParallelCollectionRDD[0] at execute at <console>:1 []
Code Generation¶
spark.sql.codegen.comments
Turn spark.sql.codegen.comments on to see comments in the code.
./bin/spark-shell --conf spark.sql.codegen.comments=true
val q = spark.range(1)
.selectExpr("inline(array(struct(1, 'a'), struct(2, 'b')))")
scala> q.explain
== Physical Plan ==
*(1) Generate inline([[1,a],[2,b]]), false, [col1#12, col2#13]
+- *(1) Project
+- *(1) Range (0, 1, step=1, splits=16)
val sparkPlan = q.queryExecution.executedPlan
import org.apache.spark.sql.execution.GenerateExec
val ge = sparkPlan.collect { case ge: GenerateExec => ge }.head
FIXME
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = ge.child.asInstanceOf[WholeStageCodegenExec]
val (_, code) = wsce.doCodeGen
import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
val formattedCode = CodeFormatter.format(code)
scala> println(formattedCode)
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
* Codegend pipeline for
* Project
* +- Range (0, 1, step=1, splits=8)
*/
/* 006 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows;
/* 010 */ private boolean range_initRange;
/* 011 */ private long range_number;
/* 012 */ private TaskContext range_taskContext;
/* 013 */ private InputMetrics range_inputMetrics;
/* 014 */ private long range_batchEnd;
/* 015 */ private long range_numElementsTodo;
/* 016 */ private scala.collection.Iterator range_input;
/* 017 */ private UnsafeRow range_result;
/* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder;
/* 019 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter;
/* 020 */
/* 021 */ public GeneratedIterator(Object[] references) {
/* 022 */ this.references = references;
/* 023 */ }
/* 024 */
/* 025 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 026 */ partitionIndex = index;
/* 027 */ this.inputs = inputs;
/* 028 */ range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 029 */ range_initRange = false;
/* 030 */ range_number = 0L;
/* 031 */ range_taskContext = TaskContext.get();
/* 032 */ range_inputMetrics = range_taskContext.taskMetrics().inputMetrics();
/* 033 */ range_batchEnd = 0;
/* 034 */ range_numElementsTodo = 0L;
/* 035 */ range_input = inputs[0];
/* 036 */ range_result = new UnsafeRow(1);
/* 037 */ range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0);
/* 038 */ range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1);
/* 039 */
/* 040 */ }
/* 041 */
/* 042 */ private void initRange(int idx) {
/* 043 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 044 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(8L);
/* 045 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(1L);
/* 046 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 047 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 048 */ long partitionEnd;
/* 049 */
/* 050 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 051 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 052 */ range_number = Long.MAX_VALUE;
/* 053 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 054 */ range_number = Long.MIN_VALUE;
/* 055 */ } else {
/* 056 */ range_number = st.longValue();
/* 057 */ }
/* 058 */ range_batchEnd = range_number;
/* 059 */
/* 060 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 061 */ .multiply(step).add(start);
/* 062 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 063 */ partitionEnd = Long.MAX_VALUE;
/* 064 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 065 */ partitionEnd = Long.MIN_VALUE;
/* 066 */ } else {
/* 067 */ partitionEnd = end.longValue();
/* 068 */ }
/* 069 */
/* 070 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 071 */ java.math.BigInteger.valueOf(range_number));
/* 072 */ range_numElementsTodo = startToEnd.divide(step).longValue();
/* 073 */ if (range_numElementsTodo < 0) {
/* 074 */ range_numElementsTodo = 0;
/* 075 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 076 */ range_numElementsTodo++;
/* 077 */ }
/* 078 */ }
/* 079 */
/* 080 */ protected void processNext() throws java.io.IOException {
/* 081 */ // PRODUCE: Project
/* 082 */ // PRODUCE: Range (0, 1, step=1, splits=8)
/* 083 */ // initialize Range
/* 084 */ if (!range_initRange) {
/* 085 */ range_initRange = true;
/* 086 */ initRange(partitionIndex);
/* 087 */ }
/* 088 */
/* 089 */ while (true) {
/* 090 */ long range_range = range_batchEnd - range_number;
/* 091 */ if (range_range != 0L) {
/* 092 */ int range_localEnd = (int)(range_range / 1L);
/* 093 */ for (int range_localIdx = 0; range_localIdx < range_localEnd; range_localIdx++) {
/* 094 */ long range_value = ((long)range_localIdx * 1L) + range_number;
/* 095 */
/* 096 */ // CONSUME: Project
/* 097 */ // CONSUME: WholeStageCodegen
/* 098 */ append(unsafeRow);
/* 099 */
/* 100 */ if (shouldStop()) { range_number = range_value + 1L; return; }
/* 101 */ }
/* 102 */ range_number = range_batchEnd;
/* 103 */ }
/* 104 */
/* 105 */ range_taskContext.killTaskIfInterrupted();
/* 106 */
/* 107 */ long range_nextBatchTodo;
/* 108 */ if (range_numElementsTodo > 1000L) {
/* 109 */ range_nextBatchTodo = 1000L;
/* 110 */ range_numElementsTodo -= 1000L;
/* 111 */ } else {
/* 112 */ range_nextBatchTodo = range_numElementsTodo;
/* 113 */ range_numElementsTodo = 0;
/* 114 */ if (range_nextBatchTodo == 0) break;
/* 115 */ }
/* 116 */ range_numOutputRows.add(range_nextBatchTodo);
/* 117 */ range_inputMetrics.incRecordsRead(range_nextBatchTodo);
/* 118 */
/* 119 */ range_batchEnd += range_nextBatchTodo * 1L;
/* 120 */ }
/* 121 */ }
/* 122 */
/* 123 */ }