Skip to content

AggUtils Utility

AggUtils is an utility for Aggregation execution planning strategy.

planAggregateWithoutDistinct

planAggregateWithoutDistinct(
  groupingExpressions: Seq[NamedExpression],
  aggregateExpressions: Seq[AggregateExpression],
  resultExpressions: Seq[NamedExpression],
  child: SparkPlan): Seq[SparkPlan]

planAggregateWithoutDistinct is a two-step physical operator generator.

planAggregateWithoutDistinct first creates an aggregate physical operator with aggregateExpressions in Partial mode (for partial aggregations).

Note

requiredChildDistributionExpressions for the aggregate physical operator for partial aggregation "stage" is empty.

In the end, planAggregateWithoutDistinct creates another aggregate physical operator (of the same type as before), but aggregateExpressions are now in Final mode (for final aggregations). The aggregate physical operator becomes the parent of the first aggregate operator.

Note

requiredChildDistributionExpressions for the parent aggregate physical operator for final aggregation "stage" are the Attributes of the groupingExpressions.

planAggregateWithOneDistinct

planAggregateWithOneDistinct(
  groupingExpressions: Seq[NamedExpression],
  functionsWithDistinct: Seq[AggregateExpression],
  functionsWithoutDistinct: Seq[AggregateExpression],
  resultExpressions: Seq[NamedExpression],
  child: SparkPlan): Seq[SparkPlan]

planAggregateWithOneDistinct...FIXME

Creating Physical Operator for Aggregation

createAggregate(
  requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
  groupingExpressions: Seq[NamedExpression] = Nil,
  aggregateExpressions: Seq[AggregateExpression] = Nil,
  aggregateAttributes: Seq[Attribute] = Nil,
  initialInputBufferOffset: Int = 0,
  resultExpressions: Seq[NamedExpression] = Nil,
  child: SparkPlan): SparkPlan

createAggregate creates one of the following physical operators based on the given AggregateExpressions (in the following order):

  1. HashAggregateExec when all the aggBufferAttributes (of the AggregateFunctions of the given AggregateExpressions) are supported

  2. ObjectHashAggregateExec when the following all hold:

  3. SortAggregateExec


createAggregate is used when:

Planning Execution of Streaming Aggregation

planStreamingAggregation(
  groupingExpressions: Seq[NamedExpression],
  functionsWithoutDistinct: Seq[AggregateExpression],
  resultExpressions: Seq[NamedExpression],
  stateFormatVersion: Int,
  child: SparkPlan): Seq[SparkPlan]

planStreamingAggregation...FIXME


planStreamingAggregation is used when:

  • StatefulAggregationStrategy (Spark Structured Streaming) execution planning strategy is requested to plan a logical plan of a streaming aggregation (a streaming query with Aggregate operator)

Creating Streaming Aggregate Physical Operator

createStreamingAggregate(
  requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
  groupingExpressions: Seq[NamedExpression] = Nil,
  aggregateExpressions: Seq[AggregateExpression] = Nil,
  aggregateAttributes: Seq[Attribute] = Nil,
  initialInputBufferOffset: Int = 0,
  resultExpressions: Seq[NamedExpression] = Nil,
  child: SparkPlan): SparkPlan

createStreamingAggregate creates an aggregate physical operator (with isStreaming flag enabled).

Note

createStreamingAggregate is exactly createAggregate with isStreaming flag enabled.


createStreamingAggregate is used when: