Skip to content

AggregateCodegenSupport Physical Operators

AggregateCodegenSupport is an extension of the BaseAggregateExec abstraction for aggregate physical operators that support Whole-Stage Java Code Generation (with produce and consume code execution paths).

AggregateCodegenSupport is a BlockingOperatorWithCodegen.

Contract

doConsumeWithKeys

doConsumeWithKeys(
  ctx: CodegenContext,
  input: Seq[ExprCode]): String

See:

Used when:

  • AggregateCodegenSupport is requested to doConsume

doProduceWithKeys

doProduceWithKeys(
  ctx: CodegenContext): String

See:

Used when:

needHashTable

needHashTable: Boolean

Whether this aggregate operator needs to build a hash table

Aggregate Physical Operator needHashTable
HashAggregateExec ✅
SortAggregateExec

Used when:

Implementations

supportCodegen

CodegenSupport
supportCodegen: Boolean

supportCodegen is part of the CodegenSupport abstraction.

supportCodegen is enabled (true) when all the following hold:

SortAggregateExec

SortAggregateExec physical operator can change supportCodegen.

Generating Java Source Code for Produce Path

CodegenSupport
doProduce(
  ctx: CodegenContext): String

doProduce is part of the CodegenSupport abstraction.

With no grouping keys, doProduce doProduceWithoutKeys. Otherwise, doProduce doProduceWithKeys.

doProduceWithoutKeys

doProduceWithoutKeys(
  ctx: CodegenContext): String

doProduceWithoutKeys takes the DeclarativeAggregates of the AggregateExpressions for the expressions to initialize empty aggregation buffers.

doProduceWithoutKeys...FIXME

Demo

Not only does the following query uses no groping keys, but also no aggregate functions.

val q = spark.range(4).groupBy().agg(lit(1))
scala> q.explain
warning: 1 deprecation (since 2.13.3); for details, enable `:setting -deprecation` or `:replay -deprecation`
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=174]
      +- HashAggregate(keys=[], functions=[])
         +- Project
            +- Range (0, 4, step=1, splits=16)
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
val aqe = q.queryExecution.executedPlan.collectFirst { case asp: AdaptiveSparkPlanExec => asp }.get
assert(aqe.isFinalPlan == false)
aqe.execute()
assert(aqe.isFinalPlan == true)
scala> println(q.queryExecution.explainString(mode = org.apache.spark.sql.execution.CodegenMode))
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:193(0.29% used); numInnerClasses:0) ==
*(1) HashAggregate(keys=[], functions=[], output=[])
+- *(1) Project
   +- *(1) Range (0, 4, step=1, splits=16)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean hashAgg_initAgg_0;
/* 010 */   private boolean range_initRange_0;
/* 011 */   private long range_nextIndex_0;
/* 012 */   private TaskContext range_taskContext_0;
/* 013 */   private InputMetrics range_inputMetrics_0;
/* 014 */   private long range_batchEnd_0;
/* 015 */   private long range_numElementsTodo_0;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
...

Generating Java Source Code for Consume Path

CodegenSupport
doConsume(
  ctx: CodegenContext,
  input: Seq[ExprCode],
  row: ExprCode): String

doConsume is part of the CodegenSupport abstraction.

With no grouping keys, doConsume doConsumeWithoutKeys. Otherwise, doConsume doConsumeWithKeys.

doConsumeWithoutKeys

doConsumeWithoutKeys(
  ctx: CodegenContext,
  input: Seq[ExprCode]): String

doConsumeWithoutKeys...FIXME