AggUtils Utility¶
AggUtils
is an utility for Aggregation execution planning strategy.
planAggregateWithoutDistinct¶
planAggregateWithoutDistinct(
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan]
planAggregateWithoutDistinct
is a two-step physical operator generator.
planAggregateWithoutDistinct
first creates an aggregate physical operator with aggregateExpressions
in Partial
mode (for partial aggregations).
Note
requiredChildDistributionExpressions
for the aggregate physical operator for partial aggregation "stage" is empty.
In the end, planAggregateWithoutDistinct
creates another aggregate physical operator (of the same type as before), but aggregateExpressions
are now in Final
mode (for final aggregations). The aggregate physical operator becomes the parent of the first aggregate operator.
Note
requiredChildDistributionExpressions
for the parent aggregate physical operator for final aggregation "stage" are the Attributes of the groupingExpressions
.
planAggregateWithOneDistinct¶
planAggregateWithOneDistinct(
groupingExpressions: Seq[NamedExpression],
functionsWithDistinct: Seq[AggregateExpression],
functionsWithoutDistinct: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan]
planAggregateWithOneDistinct
...FIXME
Creating Physical Operator for Aggregation¶
createAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
groupingExpressions: Seq[NamedExpression] = Nil,
aggregateExpressions: Seq[AggregateExpression] = Nil,
aggregateAttributes: Seq[Attribute] = Nil,
initialInputBufferOffset: Int = 0,
resultExpressions: Seq[NamedExpression] = Nil,
child: SparkPlan): SparkPlan
createAggregate
creates one of the following physical operators based on the given AggregateExpressions (in the following order):
-
HashAggregateExec when all the aggBufferAttributes (of the AggregateFunctions of the given AggregateExpressions) are supported
-
ObjectHashAggregateExec when the following all hold:
- spark.sql.execution.useObjectHashAggregateExec configuration property is enabled
- Aggregate expression supported
createAggregate
is used when:
AggUtils
is used to createStreamingAggregate, planAggregateWithoutDistinct, planAggregateWithOneDistinct
Planning Execution of Streaming Aggregation¶
planStreamingAggregation(
groupingExpressions: Seq[NamedExpression],
functionsWithoutDistinct: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
stateFormatVersion: Int,
child: SparkPlan): Seq[SparkPlan]
planStreamingAggregation
...FIXME
planStreamingAggregation
is used when:
StatefulAggregationStrategy
(Spark Structured Streaming) execution planning strategy is requested to plan a logical plan of a streaming aggregation (a streaming query with Aggregate operator)
Creating Streaming Aggregate Physical Operator¶
createStreamingAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
groupingExpressions: Seq[NamedExpression] = Nil,
aggregateExpressions: Seq[AggregateExpression] = Nil,
aggregateAttributes: Seq[Attribute] = Nil,
initialInputBufferOffset: Int = 0,
resultExpressions: Seq[NamedExpression] = Nil,
child: SparkPlan): SparkPlan
createStreamingAggregate
creates an aggregate physical operator (with isStreaming
flag enabled).
Note
createStreamingAggregate
is exactly createAggregate with isStreaming
flag enabled.
createStreamingAggregate
is used when:
AggUtils
is requested to plan a regular and session-windowed streaming aggregation