Skip to content

VectorizedParquetRecordReader

VectorizedParquetRecordReader is a SpecificParquetRecordReaderBase for Parquet Data Source for Vectorized Decoding.

Creating Instance

VectorizedParquetRecordReader takes the following to be created:

  • ZoneId (for timezone conversion)
  • Datetime Rebase Mode
  • Datetime Rebase Timezone
  • int96 Rebase Mode
  • int96 Rebase Timezone
  • useOffHeap
  • Capacity

VectorizedParquetRecordReader is created when:

Capacity

VectorizedParquetRecordReader is given capacity when created.

The capacity is configured using spark.sql.parquet.columnarReaderBatchSize configuration property.

Memory Mode

VectorizedParquetRecordReader uses the given useOffHeap to initialize the internal MEMORY_MODE registry when created:

useOffHeap MEMORY_MODE WritableColumnVector
true OFF_HEAP OffHeapColumnVector
false ON_HEAP OnHeapColumnVector

The MEMORY_MODE is used to initBatch (to choose the correct implementation of WritableColumnVector).

useOffHeap

useOffHeap is the value of spark.sql.columnVector.offheap.enabled configuration property when the following are executed with spark.sql.parquet.enableVectorizedReader enabled and supported schema for columnar read:

WritableColumnVectors

VectorizedParquetRecordReader defines an array of allocated WritableColumnVectors.

columnVectors is allocated when initBatch.


columnVectors is used when:

enableReturningBatches

void enableReturningBatches()

enableReturningBatches simply turns the returnColumnarBatch flag on.


enableReturningBatches is used when:

Initializing Columnar Batch

void initBatch() // (1)!
void initBatch(
  StructType partitionColumns,
  InternalRow partitionValues) // (2)!
void initBatch(
  MemoryMode memMode,
  StructType partitionColumns,
  InternalRow partitionValues) // (3)!
  1. Uses the MEMORY_MODE and no partitionColumns nor partitionValues
  2. Uses the MEMORY_MODE
  3. A private helper method

MemoryMode

The given MemoryMode is the value of MEMORY_MODE.

initBatch creates a batch schema that is sparkSchema and the input partitionColumns schema (if available).

initBatch requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors per the input memMode (i.e., OFF_HEAP or ON_HEAP memory modes, respectively). initBatch records the allocated column vectors as the internal WritableColumnVectors.

spark.sql.columnVector.offheap.enabled

OnHeapColumnVector is used based on spark.sql.columnVector.offheap.enabled configuration property.

initBatch creates a ColumnarBatch (with the allocated WritableColumnVectors).

initBatch does some additional maintenance to the WritableColumnVectors.


initBatch is used when:

Allocating ColumnVectors

ColumnVector[] allocateColumns(
  int capacity,
  StructType schema,
  boolean useOffHeap,
  int constantColumnLength)

allocateColumns creates ColumnVectors (one per every field in the given StructType).

For the given useOffHeap enabled, allocateColumns creates OffHeapColumnVectors. Otherwise, allocateColumns creates OnHeapColumnVectors.

useOffHeap

useOffHeap flag is enabled when memMode of initBatch is MemoryMode.OFF_HEAP.

nextKeyValue

Signature
boolean nextKeyValue()

nextKeyValue is part of the RecordReader (Apache Hadoop) abstraction.

nextKeyValue resultBatch.

With returnColumnarBatch enabled, nextKeyValue returns nextBatch.

Otherwise, nextKeyValue...FIXME


nextKeyValue is used when:

resultBatch

ColumnarBatch resultBatch()

resultBatch returns the columnarBatch if available. Otherwise, resultBatch initBatch first.