AggregateCodegenSupport Physical Operators¶
AggregateCodegenSupport is an extension of the BaseAggregateExec abstraction for aggregate physical operators that support Whole-Stage Java Code Generation (with produce and consume code execution paths).
AggregateCodegenSupport is a BlockingOperatorWithCodegen.
Contract¶
doConsumeWithKeys¶
doConsumeWithKeys(
ctx: CodegenContext,
input: Seq[ExprCode]): String
See:
Used when:
AggregateCodegenSupportis requested to doConsume
doProduceWithKeys¶
doProduceWithKeys(
ctx: CodegenContext): String
See:
Used when:
AggregateCodegenSupportis requested to doProduce (with grouping keys specified)
needHashTable¶
needHashTable: Boolean
Whether this aggregate operator needs to build a hash table
| Aggregate Physical Operator | needHashTable |
|---|---|
| HashAggregateExec | |
| SortAggregateExec | ❌ |
Used when:
AggregateCodegenSupportis requested to doProduceWithoutKeys
Implementations¶
supportCodegen¶
supportCodegen is enabled (true) when all the following hold:
- All aggregate buffer attributes are mutable
- No ImperativeAggregates among the AggregateFunctions (of the AggregateExpressions)
SortAggregateExec
SortAggregateExec physical operator can change supportCodegen.
Generating Java Source Code for Produce Path¶
CodegenSupport
doProduce(
ctx: CodegenContext): String
doProduce is part of the CodegenSupport abstraction.
With no grouping keys, doProduce doProduceWithoutKeys. Otherwise, doProduce doProduceWithKeys.
doProduceWithoutKeys¶
doProduceWithoutKeys(
ctx: CodegenContext): String
doProduceWithoutKeys takes the DeclarativeAggregates of the AggregateExpressions for the expressions to initialize empty aggregation buffers.
doProduceWithoutKeys...FIXME
Demo¶
Not only does the following query uses no groping keys, but also no aggregate functions.
val q = spark.range(4).groupBy().agg(lit(1))
scala> q.explain
warning: 1 deprecation (since 2.13.3); for details, enable `:setting -deprecation` or `:replay -deprecation`
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=174]
+- HashAggregate(keys=[], functions=[])
+- Project
+- Range (0, 4, step=1, splits=16)
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
val aqe = q.queryExecution.executedPlan.collectFirst { case asp: AdaptiveSparkPlanExec => asp }.get
assert(aqe.isFinalPlan == false)
aqe.execute()
assert(aqe.isFinalPlan == true)
scala> println(q.queryExecution.explainString(mode = org.apache.spark.sql.execution.CodegenMode))
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:193(0.29% used); numInnerClasses:0) ==
*(1) HashAggregate(keys=[], functions=[], output=[])
+- *(1) Project
+- *(1) Range (0, 4, step=1, splits=16)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean hashAgg_initAgg_0;
/* 010 */ private boolean range_initRange_0;
/* 011 */ private long range_nextIndex_0;
/* 012 */ private TaskContext range_taskContext_0;
/* 013 */ private InputMetrics range_inputMetrics_0;
/* 014 */ private long range_batchEnd_0;
/* 015 */ private long range_numElementsTodo_0;
/* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
...
Generating Java Source Code for Consume Path¶
CodegenSupport
doConsume(
ctx: CodegenContext,
input: Seq[ExprCode],
row: ExprCode): String
doConsume is part of the CodegenSupport abstraction.
With no grouping keys, doConsume doConsumeWithoutKeys. Otherwise, doConsume doConsumeWithKeys.
doConsumeWithoutKeys¶
doConsumeWithoutKeys(
ctx: CodegenContext,
input: Seq[ExprCode]): String
doConsumeWithoutKeys...FIXME