UnsafeFixedWidthAggregationMap¶
UnsafeFixedWidthAggregationMap
is a tiny layer (extension) over Spark Core's BytesToBytesMap for UnsafeRow keys and values.
UnsafeFixedWidthAggregationMap
is used when HashAggregateExec physical operator is executed:
- Directly in createHashMap while generating Java code in doProduceWithKeys
- Indirectly through TungstenAggregationIterator while doExecute
Creating Instance¶
UnsafeFixedWidthAggregationMap
takes the following to be created:
- Empty Aggregation Buffer
- Aggregation Buffer Schema (StructType)
- Grouping Key Schema (StructType)
-
TaskContext
(Spark Core) - Initial Capacity
- Page Size
Page Size¶
UnsafeFixedWidthAggregationMap
is given the page size (in bytes) of the BytesToBytesMap when created.
The page size is what the TaskMemoryManager
(Spark Core) is configured with.
Initial Capacity¶
UnsafeFixedWidthAggregationMap
is given the initial capacity of the BytesToBytesMap when created.
The initial capacity is hard-coded to 1024 * 16
.
Empty Aggregation Buffer¶
There are two emptyAggregationBuffer
s
InternalRow emptyAggregationBuffer
UnsafeFixedWidthAggregationMap
is given an InternalRow (emptyAggregationBuffer
) when created.
This emptyAggregationBuffer
is used to initialize another emptyAggregationBuffer
to be of type byte[]
.
byte[] emptyAggregationBuffer
private final
emptyAggregationBuffer
is a private final
value that has to be initialized when this UnsafeFixedWidthAggregationMap
is created.
emptyAggregationBuffer
is initialized to be byte
s of an UnsafeProjection (with the aggregationBufferSchema) applied to the input emptyAggregationBuffer
.
emptyAggregationBuffer
is used to getAggregationBufferFromUnsafeRow as the (default) value for new keys (a "zero" of an aggregate function).
BytesToBytesMap¶
When created, UnsafeFixedWidthAggregationMap
creates a BytesToBytesMap
(Spark Core) with the following:
TaskMemoryManager
(Spark Core) of this TaskContext- Initial Capacity
- Page Size
The BytesToBytesMap
is used when:
- getAggregationBufferFromUnsafeRow to look up
- iterator
- getPeakMemoryUsedBytes
- free
- getAvgHashProbeBucketListIterations
- destructAndCreateExternalSorter
supportsAggregationBufferSchema¶
boolean supportsAggregationBufferSchema(
StructType schema)
supportsAggregationBufferSchema
is enabled (true
) if all of the top-level fields (of the given schema) are mutable.
import org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap
import org.apache.spark.sql.types._
val schemaWithImmutableField = StructType(
StructField("name", StringType) :: Nil)
assert(UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(schemaWithImmutableField) == false)
val schemaWithMutableFields = StructType(
StructField("id", IntegerType) :: StructField("bool", BooleanType) :: Nil)
assert(UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(schemaWithMutableFields))
supportsAggregationBufferSchema
is used when:
HashAggregateExec
utility is used for the selection requirements
Looking Up Aggregation Value Buffer for Grouping Key¶
UnsafeRow getAggregationBufferFromUnsafeRow(
UnsafeRow key) // (1)!
UnsafeRow getAggregationBufferFromUnsafeRow(
UnsafeRow key,
int hash)
- Uses the hash code of the given key
getAggregationBufferFromUnsafeRow
gives one of the following:
- currentAggregationBuffer pointed to the value for the grouping
key
null
for no value for the givenkey
and insertion failed
getAggregationBufferFromUnsafeRow
looks up the given (grouping) key
(in the BytesToBytesMap).
Unless found, getAggregationBufferFromUnsafeRow
uses the Empty Aggregation Buffer as the value instead.
null
when insertion fails
The Empty Aggregation Buffer is copied over to the BytesToBytesMap for the grouping key.
In case the insertion fails, getAggregationBufferFromUnsafeRow
returns null
.
In the end, getAggregationBufferFromUnsafeRow
requests the currentAggregationBuffer to pointTo to the value (that was just stored or looked up).
getAggregationBufferFromUnsafeRow
is used when:
TungstenAggregationIterator
is requested to process input rows
Current Aggregation Buffer¶
UnsafeFixedWidthAggregationMap
creates an UnsafeRow when created.
The number of fields of this UnsafeRow
is the length of the aggregationBufferSchema.
The UnsafeRow
is (re)used to point to the value (that was stored or looked up) in getAggregationBufferFromUnsafeRow.