UnsafeFixedWidthAggregationMap¶
UnsafeFixedWidthAggregationMap is a tiny layer (extension) over Spark Core's BytesToBytesMap for UnsafeRow keys and values.
UnsafeFixedWidthAggregationMap is used when HashAggregateExec physical operator is executed:
- Directly in createHashMap while generating Java code in doProduceWithKeys
- Indirectly through TungstenAggregationIterator while doExecute
Creating Instance¶
UnsafeFixedWidthAggregationMap takes the following to be created:
- Empty Aggregation Buffer
- Aggregation Buffer Schema (StructType)
- Grouping Key Schema (StructType)
-
TaskContext(Spark Core) - Initial Capacity
- Page Size
Page Size¶
UnsafeFixedWidthAggregationMap is given the page size (in bytes) of the BytesToBytesMap when created.
The page size is what the TaskMemoryManager (Spark Core) is configured with.
Initial Capacity¶
UnsafeFixedWidthAggregationMap is given the initial capacity of the BytesToBytesMap when created.
The initial capacity is hard-coded to 1024 * 16.
Empty Aggregation Buffer¶
There are two emptyAggregationBuffers
InternalRow emptyAggregationBuffer
UnsafeFixedWidthAggregationMap is given an InternalRow (emptyAggregationBuffer) when created.
This emptyAggregationBuffer is used to initialize another emptyAggregationBuffer to be of type byte[].
byte[] emptyAggregationBuffer
private final
emptyAggregationBuffer is a private final value that has to be initialized when this UnsafeFixedWidthAggregationMap is created.
emptyAggregationBuffer is initialized to be bytes of an UnsafeProjection (with the aggregationBufferSchema) applied to the input emptyAggregationBuffer.
emptyAggregationBuffer is used to getAggregationBufferFromUnsafeRow as the (default) value for new keys (a "zero" of an aggregate function).
BytesToBytesMap¶
When created, UnsafeFixedWidthAggregationMap creates a BytesToBytesMap (Spark Core) with the following:
TaskMemoryManager(Spark Core) of this TaskContext- Initial Capacity
- Page Size
The BytesToBytesMap is used when:
- getAggregationBufferFromUnsafeRow to look up
- iterator
- getPeakMemoryUsedBytes
- free
- getAvgHashProbeBucketListIterations
- destructAndCreateExternalSorter
supportsAggregationBufferSchema¶
boolean supportsAggregationBufferSchema(
StructType schema)
supportsAggregationBufferSchema is enabled (true) if all of the top-level fields (of the given schema) are mutable.
import org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap
import org.apache.spark.sql.types._
val schemaWithImmutableField = StructType(
StructField("name", StringType) :: Nil)
assert(UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(schemaWithImmutableField) == false)
val schemaWithMutableFields = StructType(
StructField("id", IntegerType) :: StructField("bool", BooleanType) :: Nil)
assert(UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(schemaWithMutableFields))
supportsAggregationBufferSchema is used when:
HashAggregateExecutility is used for the selection requirements
Looking Up Aggregation Value Buffer for Grouping Key¶
UnsafeRow getAggregationBufferFromUnsafeRow(
UnsafeRow key) // (1)!
UnsafeRow getAggregationBufferFromUnsafeRow(
UnsafeRow key,
int hash)
- Uses the hash code of the given key
getAggregationBufferFromUnsafeRow gives one of the following:
- currentAggregationBuffer pointed to the value for the grouping
key nullfor no value for the givenkeyand insertion failed
getAggregationBufferFromUnsafeRow looks up the given (grouping) key (in the BytesToBytesMap).
Unless found, getAggregationBufferFromUnsafeRow uses the Empty Aggregation Buffer as the value instead.
null when insertion fails
The Empty Aggregation Buffer is copied over to the BytesToBytesMap for the grouping key.
In case the insertion fails, getAggregationBufferFromUnsafeRow returns null.
In the end, getAggregationBufferFromUnsafeRow requests the currentAggregationBuffer to pointTo to the value (that was just stored or looked up).
getAggregationBufferFromUnsafeRow is used when:
TungstenAggregationIteratoris requested to process input rows
Current Aggregation Buffer¶
UnsafeFixedWidthAggregationMap creates an UnsafeRow when created.
The number of fields of this UnsafeRow is the length of the aggregationBufferSchema.
The UnsafeRow is (re)used to point to the value (that was stored or looked up) in getAggregationBufferFromUnsafeRow.