VectorizedParquetRecordReader¶
VectorizedParquetRecordReader
is a SpecificParquetRecordReaderBase for parquet data source for Vectorized Parquet Decoding.
Creating Instance¶
VectorizedParquetRecordReader
takes the following to be created:
-
ZoneId
(for timezone conversion) - Datetime Rebase Mode
- Datetime Rebase Timezone
- int96 Rebase Mode
- int96 Rebase Timezone
-
useOffHeap
flag - Capacity
VectorizedParquetRecordReader
is created when:
ParquetFileFormat
is requested to buildReaderWithPartitionValues (with enableVectorizedReader flag enabled)ParquetPartitionReaderFactory
is requested to createParquetVectorizedReader
WritableColumnVectors¶
VectorizedParquetRecordReader
defines an array of allocated WritableColumnVectors.
columnVectors
is allocated when initBatch.
columnVectors
is used when:
enableReturningBatches¶
void enableReturningBatches()
enableReturningBatches
simply turns the returnColumnarBatch flag on.
enableReturningBatches
is used when:
ParquetFileFormat
is requested to buildReaderWithPartitionValuesParquetPartitionReaderFactory
is requested to buildColumnarReader
Initializing Columnar Batch¶
void initBatch() // (1)!
void initBatch(
StructType partitionColumns,
InternalRow partitionValues) // (2)!
void initBatch(
MemoryMode memMode,
StructType partitionColumns,
InternalRow partitionValues) // (3)!
- Uses the MEMORY_MODE and no partitionColumns nor partitionValues
- Uses the MEMORY_MODE
- A private helper method
initBatch
creates a batch schema that is sparkSchema and the input partitionColumns
schema (if available).
initBatch
requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors per the input memMode
, i.e. OFF_HEAP or ON_HEAP memory modes, respectively. initBatch
records the allocated column vectors as the internal WritableColumnVectors.
Note
OnHeapColumnVector is used based on spark.sql.columnVector.offheap.enabled configuration property.
initBatch
creates a ColumnarBatch (with the allocated WritableColumnVectors) and records it as the internal ColumnarBatch.
initBatch
does some additional maintenance to the columnVectors.
initBatch
is used when:
VectorizedParquetRecordReader
is requested to resultBatchParquetFileFormat
is requested to build a data reader (with partition column values appended)ParquetPartitionReaderFactory
is requested to createVectorizedReader
Review Me¶
VectorizedParquetRecordReader
uses <true
).
[[internal-registries]] .VectorizedParquetRecordReader's Internal Properties (e.g. Registries, Counters and Flags) [cols="1m,3",options="header",width="100%"] |=== | Name | Description
| batchIdx | [[batchIdx]] Current batch index that is the index of an InternalRow
in the <VectorizedParquetRecordReader
is requested to <
Starts at 0
Increments every <
Reset to 0
when <
| columnarBatch | [[columnarBatch]] ColumnarBatch
| columnReaders | [[columnReaders]] VectorizedColumnReaders (one reader per column) to <
Intialized when <
| MEMORY_MODE a| [[MEMORY_MODE]] Memory mode of the <
- [[OFF_HEAP]]
OFF_HEAP
(when <> is on as based on spark.sql.columnVector.offheap.enabled configuration property) - [[ON_HEAP]]
ON_HEAP
Used exclusively when VectorizedParquetRecordReader
is requested to <
| missingColumns | [[missingColumns]] Bitmap of columns (per index) that are missing (or simply the ones that the reader should not read)
| returnColumnarBatch | [[returnColumnarBatch]] Optimization flag to control whether VectorizedParquetRecordReader
offers rows as the <
Default: false
Enabled (true
) when VectorizedParquetRecordReader
is requested to <
Used in <InternalRow
)
| rowsReturned | [[rowsReturned]] Number of rows read already
| totalRowCount | [[totalRowCount]] Total number of rows to be read
|===
nextKeyValue¶
boolean nextKeyValue()
NOTE: nextKeyValue
is part of Hadoop's https://hadoop.apache.org/docs/r2.7.4/api/org/apache/hadoop/mapred/RecordReader.html[RecordReader] to read (key, value) pairs from a Hadoop https://hadoop.apache.org/docs/r2.7.4/api/org/apache/hadoop/mapred/InputSplit.html[InputSplit] to present a record-oriented view.
nextKeyValue
...FIXME
nextKeyValue
is used when:
-
NewHadoopRDD
is requested to compute a partition (compute
) -
RecordReaderIterator
is requested to check whether or not there are more internal rows
resultBatch¶
ColumnarBatch resultBatch()
resultBatch
gives <
NOTE: resultBatch
is used exclusively when VectorizedParquetRecordReader
is requested to <
Reading Next Rows Into Columnar Batch¶
boolean nextBatch()
nextBatch
reads at least <true
when there are rows available. Otherwise, nextBatch
returns false
(to "announce" there are no rows available).
Internally, nextBatch
firstly requests every WritableColumnVector (in the <
nextBatch
requests the <0
(effectively resetting the batch and making it available for reuse).
When the <nextBatch
finishes with (returns) false
(to "announce" there are no rows available).
nextBatch
<
nextBatch
calculates the number of rows left to be returned as a minimum of the <
nextBatch
requests every <
NOTE: <
NOTE: The number of rows in the internal <
In the end, nextBatch
registers the progress as follows:
-
The number of rows read is added to the <
> counter -
Requests the internal <
> to set the number of rows (in batch) to be the number of rows read -
The <
> registry is exactly the number of rows read -
The <
> registry becomes 0
nextBatch
finishes with (returns) true
(to "announce" there are rows available).
NOTE: nextBatch
is used exclusively when VectorizedParquetRecordReader
is requested to <
Getting Current Value (as Columnar Batch or Single InternalRow)¶
Object getCurrentValue()
NOTE: getCurrentValue
is part of the Hadoop https://hadoop.apache.org/docs/r2.7.5/api/org/apache/hadoop/mapreduce/RecordReader.html[RecordReader] Contract to break the data into key/value pairs for input to a Hadoop Mapper
.
getCurrentValue
returns the entire <true
) or requests it for a single row instead.
getCurrentValue
is used when:
-
NewHadoopRDD
is requested to compute a partition (compute
) -
RecordReaderIterator
is requested for the next internal row