Skip to content

UnsafeFixedWidthAggregationMap

UnsafeFixedWidthAggregationMap is a tiny layer (extension) over Spark Core's BytesToBytesMap for UnsafeRow keys and values.

UnsafeFixedWidthAggregationMap is used when HashAggregateExec physical operator is executed:

Creating Instance

UnsafeFixedWidthAggregationMap takes the following to be created:

Page Size

UnsafeFixedWidthAggregationMap is given the page size (in bytes) of the BytesToBytesMap when created.

The page size is what the TaskMemoryManager (Spark Core) is configured with.

Initial Capacity

UnsafeFixedWidthAggregationMap is given the initial capacity of the BytesToBytesMap when created.

The initial capacity is hard-coded to 1024 * 16.

Empty Aggregation Buffer

There are two emptyAggregationBuffers
InternalRow emptyAggregationBuffer

UnsafeFixedWidthAggregationMap is given an InternalRow (emptyAggregationBuffer) when created.

This emptyAggregationBuffer is used to initialize another emptyAggregationBuffer to be of type byte[].

byte[] emptyAggregationBuffer
private final

emptyAggregationBuffer is a private final value that has to be initialized when this UnsafeFixedWidthAggregationMap is created.

emptyAggregationBuffer is initialized to be bytes of an UnsafeProjection (with the aggregationBufferSchema) applied to the input emptyAggregationBuffer.

emptyAggregationBuffer is used to getAggregationBufferFromUnsafeRow as the (default) value for new keys (a "zero" of an aggregate function).

BytesToBytesMap

When created, UnsafeFixedWidthAggregationMap creates a BytesToBytesMap (Spark Core) with the following:

The BytesToBytesMap is used when:

supportsAggregationBufferSchema

boolean supportsAggregationBufferSchema(
  StructType schema)

supportsAggregationBufferSchema is enabled (true) if all of the top-level fields (of the given schema) are mutable.

import org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap
import org.apache.spark.sql.types._
val schemaWithImmutableField = StructType(
  StructField("name", StringType) :: Nil)
assert(UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(schemaWithImmutableField) == false)
val schemaWithMutableFields = StructType(
  StructField("id", IntegerType) :: StructField("bool", BooleanType) :: Nil)
assert(UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(schemaWithMutableFields))

supportsAggregationBufferSchema is used when:

Looking Up Aggregation Value Buffer for Grouping Key

UnsafeRow getAggregationBufferFromUnsafeRow(
  UnsafeRow key) // (1)!
UnsafeRow getAggregationBufferFromUnsafeRow(
  UnsafeRow key,
  int hash)
  1. Uses the hash code of the given key

getAggregationBufferFromUnsafeRow gives one of the following:

  • currentAggregationBuffer pointed to the value for the grouping key
  • null for no value for the given key and insertion failed

getAggregationBufferFromUnsafeRow looks up the given (grouping) key (in the BytesToBytesMap).

Unless found, getAggregationBufferFromUnsafeRow uses the Empty Aggregation Buffer as the value instead.

null when insertion fails

The Empty Aggregation Buffer is copied over to the BytesToBytesMap for the grouping key.

In case the insertion fails, getAggregationBufferFromUnsafeRow returns null.

In the end, getAggregationBufferFromUnsafeRow requests the currentAggregationBuffer to pointTo to the value (that was just stored or looked up).


getAggregationBufferFromUnsafeRow is used when:

Current Aggregation Buffer

UnsafeFixedWidthAggregationMap creates an UnsafeRow when created.

The number of fields of this UnsafeRow is the length of the aggregationBufferSchema.

The UnsafeRow is (re)used to point to the value (that was stored or looked up) in getAggregationBufferFromUnsafeRow.