AggregateCodegenSupport Physical Operators¶
AggregateCodegenSupport
is an extension of the BaseAggregateExec abstraction for aggregate physical operators that support Whole-Stage Java Code Generation (with produce and consume code execution paths).
AggregateCodegenSupport
is a BlockingOperatorWithCodegen
.
Contract¶
doConsumeWithKeys¶
doConsumeWithKeys(
ctx: CodegenContext,
input: Seq[ExprCode]): String
See:
Used when:
AggregateCodegenSupport
is requested to doConsume
doProduceWithKeys¶
doProduceWithKeys(
ctx: CodegenContext): String
See:
Used when:
AggregateCodegenSupport
is requested to doProduce (with grouping keys specified)
needHashTable¶
needHashTable: Boolean
Whether this aggregate operator needs to build a hash table
Aggregate Physical Operator | needHashTable |
---|---|
HashAggregateExec | |
SortAggregateExec | ❌ |
Used when:
AggregateCodegenSupport
is requested to doProduceWithoutKeys
Implementations¶
supportCodegen¶
supportCodegen
is enabled (true
) when all the following hold:
- All aggregate buffer attributes are mutable
- No ImperativeAggregates among the AggregateFunctions (of the AggregateExpressions)
SortAggregateExec
SortAggregateExec
physical operator can change supportCodegen.
Generating Java Source Code for Produce Path¶
CodegenSupport
doProduce(
ctx: CodegenContext): String
doProduce
is part of the CodegenSupport abstraction.
With no grouping keys, doProduce
doProduceWithoutKeys. Otherwise, doProduce
doProduceWithKeys.
doProduceWithoutKeys¶
doProduceWithoutKeys(
ctx: CodegenContext): String
doProduceWithoutKeys
takes the DeclarativeAggregates of the AggregateExpressions for the expressions to initialize empty aggregation buffers.
doProduceWithoutKeys
...FIXME
Demo¶
Not only does the following query uses no groping keys, but also no aggregate functions.
val q = spark.range(4).groupBy().agg(lit(1))
scala> q.explain
warning: 1 deprecation (since 2.13.3); for details, enable `:setting -deprecation` or `:replay -deprecation`
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=174]
+- HashAggregate(keys=[], functions=[])
+- Project
+- Range (0, 4, step=1, splits=16)
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
val aqe = q.queryExecution.executedPlan.collectFirst { case asp: AdaptiveSparkPlanExec => asp }.get
assert(aqe.isFinalPlan == false)
aqe.execute()
assert(aqe.isFinalPlan == true)
scala> println(q.queryExecution.explainString(mode = org.apache.spark.sql.execution.CodegenMode))
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:193(0.29% used); numInnerClasses:0) ==
*(1) HashAggregate(keys=[], functions=[], output=[])
+- *(1) Project
+- *(1) Range (0, 4, step=1, splits=16)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean hashAgg_initAgg_0;
/* 010 */ private boolean range_initRange_0;
/* 011 */ private long range_nextIndex_0;
/* 012 */ private TaskContext range_taskContext_0;
/* 013 */ private InputMetrics range_inputMetrics_0;
/* 014 */ private long range_batchEnd_0;
/* 015 */ private long range_numElementsTodo_0;
/* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
...
Generating Java Source Code for Consume Path¶
CodegenSupport
doConsume(
ctx: CodegenContext,
input: Seq[ExprCode],
row: ExprCode): String
doConsume
is part of the CodegenSupport abstraction.
With no grouping keys, doConsume
doConsumeWithoutKeys. Otherwise, doConsume
doConsumeWithKeys.
doConsumeWithoutKeys¶
doConsumeWithoutKeys(
ctx: CodegenContext,
input: Seq[ExprCode]): String
doConsumeWithoutKeys
...FIXME