UnsafeRow¶
UnsafeRow
is an InternalRow for mutable binary rows that are backed by raw memory outside the Java Vritual Machine (instead of Java objects that are in JVM memory space and may lead to more frequent GCs if created in excess).
UnsafeRow
supports Java's Externalizable and Kryo's KryoSerializable serialization and deserialization protocols.
Creating Instance¶
UnsafeRow
takes the following to be created:
- Number of fields
Whed created, UnsafeRow
calculates the bitset width based on the number of fields.
UnsafeRow
is created when:
RowBasedKeyValueBatch
is createdUnsafeArrayData
is requested forgetStruct
UnsafeRow
is requested to getStruct, copy, createFromByteArrayCollect
expression is requested forrow
Percentile
expression is requested todeserialize
UnsafeRowWriter
is created- many others
Mutable Data Types¶
The following DataTypes are considered mutable data types:
BooleanType
ByteType
CalendarIntervalType
DateType
DayTimeIntervalType
DecimalType
DoubleType
FloatType
IntegerType
LongType
NullType
ShortType
TimestampNTZType
TimestampType
- UserDefinedType (over a mutable data type)
YearMonthIntervalType
Mutable data types have fixed length and can be mutated in place in UnsafeRow
s (using set
methods).
Examples (possibly all) of the data types that are not mutable:
- ArrayType
BinaryType
StringType
MapType
ObjectType
- StructType
KryoSerializable SerDe Protocol¶
Learn more in KryoSerializable.
Java's Externalizable SerDe Protocol¶
Learn more in java.io.Externalizable.
sizeInBytes¶
UnsafeRow
knows its size (in bytes).
scala> println(unsafeRow.getSizeInBytes)
32
Field Offsets¶
The fields of a data row are placed using field offsets.
Mutable Types¶
UnsafeRow
considers a data type mutable if it is one of the following:
- BooleanType
- ByteType
- DateType
- DecimalType (see isMutable)
- DoubleType
- FloatType
- IntegerType
- LongType
- NullType
- ShortType
- TimestampType
8-Byte Word Alignment and Three Regions¶
UnsafeRow
is composed of three regions:
- Null Bit Set Bitmap Region (1 bit/field) for tracking
null
values - Fixed-Length 8-Byte Values Region
- Variable-Length Data Region
UnsafeRow
is always 8-byte word aligned and so their size is always a multiple of 8 bytes.
Equality and Hashing¶
Equality comparision and hashing of rows can be performed on raw bytes since if two rows are identical so should be their bit-wise representation. No type-specific interpretation is required.
baseObject¶
Object baseObject
baseObject
is assigned in pointTo, copyFrom, readExternal and read. In most cases, baseObject
is byte[]
(except a variant of pointTo that allows for Object
s).
getBaseObject¶
Object getBaseObject()
getBaseObject
returns the baseObject.
getBaseObject
is used when:
UnsafeWriter
is requested towrite
anUnsafeRow
UnsafeExternalRowSorter
is requested toinsertRow
anUnsafeRow
UnsafeFixedWidthAggregationMap
is requested to getAggregationBufferFromUnsafeRowUnsafeKVExternalSorter
is requested toinsertKV
ExternalAppendOnlyUnsafeRowArray
is requested to add an UnsafeRowUnsafeHashedRelation
is requested to get, getValue, getWithKeyIndex, getValueWithKeyIndex, applyLongToUnsafeRowMap
is requested toappend
InMemoryRowQueue
is requested toadd
anUnsafeRow
writeToStream¶
void writeToStream(
OutputStream out,
byte[] writeBuffer)
writeToStream
branches off based on whether the baseObject is byte[]
or not.
writeToStream
...FIXME
writeToStream
is used when:
SparkPlan
is requested to compress RDD partitions (of UnsafeRows) to byte arraysUnsafeRowSerializerInstance
is requested to serializeStreamPercentile
expression is requested toserialize
pointTo¶
void pointTo(
byte[] buf,
int sizeInBytes) // (1)
void pointTo(
Object baseObject,
long baseOffset,
int sizeInBytes)
- Uses
Platform.BYTE_ARRAY_OFFSET
asbaseOffset
pointTo
sets the baseObject, the baseOffset and the sizeInBytes to the given values.
pointTo
asserts the following:
- numFields is 0 or greater
- The given
sizeInBytes
is a multiple of 8
copyFrom¶
void copyFrom(
UnsafeRow row)
copyFrom
...FIXME
copyFrom
is used when:
ObjectAggregationIterator
is requested to processInputs (usingSortBasedAggregator
)TungstenAggregationIterator
is requested to produce the next UnsafeRow and outputForEmptyGroupingKeyWithoutInput
Deserializing UnsafeRow¶
Regardless of whether Java or Kryo are used for deserialization, they read values from the given ObjectInput
to assign the internal registries.
Registry | Value |
---|---|
baseOffset | The offset of the first element in the storage allocation of a byte array (BYTE_ARRAY_OFFSET ) |
sizeInBytes | The first four bytes (Java's int ) from the ObjectInput |
numFields | The next four bytes (Java's int ) from the ObjectInput |
bitSetWidthInBytes | Based on the numFields |
baseObject | byte[] (of sizeInBytes size) |
Kryo¶
void read(
Kryo kryo,
Input in)
read
is part of the KryoSerializable
(Kryo) abstraction.
Java¶
void readExternal(
ObjectInput in)
readExternal
is part of the Externalizable
(Java) abstraction.
Demo¶
// Use ExpressionEncoder for simplicity
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
val stringEncoder = ExpressionEncoder[String]
val row = stringEncoder.toRow("hello world")
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
val unsafeRow = row match { case ur: UnsafeRow => ur }
scala> unsafeRow.getBytes
res0: Array[Byte] = Array(0, 0, 0, 0, 0, 0, 0, 0, 11, 0, 0, 0, 16, 0, 0, 0, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 0, 0, 0, 0, 0)
scala> unsafeRow.getUTF8String(0)
res1: org.apache.spark.unsafe.types.UTF8String = hello world
// a sample human-readable row representation
// id (long), txt (string), num (int)
val id: Long = 0
val txt: String = "hello world"
val num: Int = 110
val singleRow = Seq(id, txt, num)
val numFields = singleRow.size
// that's not enough and I learnt it a few lines down
val rowDataInBytes = Array(id.toByte) ++ txt.toArray.map(_.toByte) ++ Array(num.toByte)
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
val row = new UnsafeRow(numFields)
sizeInBytes should be a multiple of 8 and it's a coincidence that this pointTo does not catch it. Checking sizeInBytes % 8 == 0
passes fine and that's why the demo fails later on.
row.pointTo(rowDataInBytes, rowDataInBytes.length)
The following will certainly fail. Consider it a WIP.
assert(row.getLong(0) == id)
WORD_SIZE¶
UnsafeRow
uses 8
as the size of a word for MapEntries
unary expression to doGenCode
and genCodeForPrimitiveElements
.