Skip to content

VectorizedParquetRecordReader

VectorizedParquetRecordReader is a SpecificParquetRecordReaderBase for parquet data source for Vectorized Parquet Decoding.

Creating Instance

VectorizedParquetRecordReader takes the following to be created:

  • ZoneId (for timezone conversion)
  • Datetime Rebase Mode
  • Datetime Rebase Timezone
  • int96 Rebase Mode
  • int96 Rebase Timezone
  • useOffHeap flag
  • Capacity

VectorizedParquetRecordReader is created when:

WritableColumnVectors

VectorizedParquetRecordReader defines an array of allocated WritableColumnVectors.

columnVectors is allocated when initBatch.

columnVectors is used when:

enableReturningBatches

void enableReturningBatches()

enableReturningBatches simply turns the returnColumnarBatch flag on.

enableReturningBatches is used when:

Initializing Columnar Batch

void initBatch() // (1)!
void initBatch(
  StructType partitionColumns,
  InternalRow partitionValues) // (2)!
void initBatch(
  MemoryMode memMode,
  StructType partitionColumns,
  InternalRow partitionValues) // (3)!
  1. Uses the MEMORY_MODE and no partitionColumns nor partitionValues
  2. Uses the MEMORY_MODE
  3. A private helper method

initBatch creates a batch schema that is sparkSchema and the input partitionColumns schema (if available).

initBatch requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors per the input memMode, i.e. OFF_HEAP or ON_HEAP memory modes, respectively. initBatch records the allocated column vectors as the internal WritableColumnVectors.

Note

OnHeapColumnVector is used based on spark.sql.columnVector.offheap.enabled configuration property.

initBatch creates a ColumnarBatch (with the allocated WritableColumnVectors) and records it as the internal ColumnarBatch.

initBatch does some additional maintenance to the columnVectors.

initBatch is used when:

Review Me

VectorizedParquetRecordReader uses <> memory mode when spark.sql.columnVector.offheap.enabled internal configuration property is enabled (true).

[[internal-registries]] .VectorizedParquetRecordReader's Internal Properties (e.g. Registries, Counters and Flags) [cols="1m,3",options="header",width="100%"] |=== | Name | Description

| batchIdx | [[batchIdx]] Current batch index that is the index of an InternalRow in the <>. Used when VectorizedParquetRecordReader is requested to <> with the <> flag disabled

Starts at 0

Increments every <>

Reset to 0 when <>

| columnarBatch | [[columnarBatch]] ColumnarBatch

| columnReaders | [[columnReaders]] VectorizedColumnReaders (one reader per column) to <>

Intialized when <> (when requested to <>)

| MEMORY_MODE a| [[MEMORY_MODE]] Memory mode of the <>

Used exclusively when VectorizedParquetRecordReader is requested to <>.

| missingColumns | [[missingColumns]] Bitmap of columns (per index) that are missing (or simply the ones that the reader should not read)

| returnColumnarBatch | [[returnColumnarBatch]] Optimization flag to control whether VectorizedParquetRecordReader offers rows as the <> or one row at a time only

Default: false

Enabled (true) when VectorizedParquetRecordReader is requested to <>

Used in <> (to <>) and <> (to return the internal <> not a single InternalRow)

| rowsReturned | [[rowsReturned]] Number of rows read already

| totalRowCount | [[totalRowCount]] Total number of rows to be read

|===

nextKeyValue

boolean nextKeyValue()

NOTE: nextKeyValue is part of Hadoop's https://hadoop.apache.org/docs/r2.7.4/api/org/apache/hadoop/mapred/RecordReader.html[RecordReader] to read (key, value) pairs from a Hadoop https://hadoop.apache.org/docs/r2.7.4/api/org/apache/hadoop/mapred/InputSplit.html[InputSplit] to present a record-oriented view.

nextKeyValue...FIXME

nextKeyValue is used when:

resultBatch

ColumnarBatch resultBatch()

resultBatch gives <> if available or does <>.

NOTE: resultBatch is used exclusively when VectorizedParquetRecordReader is requested to <>.

Reading Next Rows Into Columnar Batch

boolean nextBatch()

nextBatch reads at least <> rows and returns true when there are rows available. Otherwise, nextBatch returns false (to "announce" there are no rows available).

Internally, nextBatch firstly requests every WritableColumnVector (in the <> internal registry) to reset itself.

nextBatch requests the <> to specify the number of rows (in batch) as 0 (effectively resetting the batch and making it available for reuse).

When the <> is greater than the <>, nextBatch finishes with (returns) false (to "announce" there are no rows available).

nextBatch <>.

nextBatch calculates the number of rows left to be returned as a minimum of the <> and the <> reduced by the <>.

nextBatch requests every <> to readBatch (with the number of rows left to be returned and associated <>).

NOTE: <> use their own <> for storing values read. The numbers of <> and <> are equal.

NOTE: The number of rows in the internal <> matches the number of rows that <> decoded and stored in corresponding <>.

In the end, nextBatch registers the progress as follows:

  • The number of rows read is added to the <> counter

  • Requests the internal <> to set the number of rows (in batch) to be the number of rows read

  • The <> registry is exactly the number of rows read

  • The <> registry becomes 0

nextBatch finishes with (returns) true (to "announce" there are rows available).

NOTE: nextBatch is used exclusively when VectorizedParquetRecordReader is requested to <>.

Getting Current Value (as Columnar Batch or Single InternalRow)

Object getCurrentValue()

NOTE: getCurrentValue is part of the Hadoop https://hadoop.apache.org/docs/r2.7.5/api/org/apache/hadoop/mapreduce/RecordReader.html[RecordReader] Contract to break the data into key/value pairs for input to a Hadoop Mapper.

getCurrentValue returns the entire <> with the <> flag enabled (true) or requests it for a single row instead.

getCurrentValue is used when:

  • NewHadoopRDD is requested to compute a partition (compute)

  • RecordReaderIterator is requested for the next internal row