Skip to content

ParquetPartitionReaderFactory

ParquetPartitionReaderFactory is a FilePartitionReaderFactory (of ParquetScan) for batch queries in Parquet Connector.

Creating Instance

ParquetPartitionReaderFactory takes the following to be created:

ParquetPartitionReaderFactory is created when:

enableVectorizedReader

ParquetPartitionReaderFactory defines enableVectorizedReader internal flag to indicate whether isBatchReadSupported for the resultSchema or not.

enableVectorizedReader internal flag is used for the following:

ParquetPartitionReaderFactory uses enableVectorizedReader flag to determine a Hadoop RecordReader to use when requested for a PartitionReader.

columnVector.offheap.enabled

ParquetPartitionReaderFactory uses spark.sql.columnVector.offheap.enabled configuration property when requested for the following:

supportColumnarReads

Signature
supportColumnarReads(
  partition: InputPartition): Boolean

supportColumnarReads is part of the PartitionReaderFactory abstraction.

ParquetPartitionReaderFactory supports columnar reads when the following all hold:

  1. spark.sql.parquet.enableVectorizedReader is enabled
  2. spark.sql.codegen.wholeStage is enabled
  3. The number of the resultSchema fields is at most spark.sql.codegen.maxFields

Building Columnar PartitionReader

Signature
buildColumnarReader(
  file: PartitionedFile): PartitionReader[ColumnarBatch]

buildColumnarReader is part of the FilePartitionReaderFactory abstraction.

buildColumnarReader createVectorizedReader (for the given PartitionedFile) and requests it to enableReturningBatches.

In the end, buildColumnarReader returns a PartitionReader that returns ColumnarBatches (when requested for records).

Building PartitionReader

Signature
buildReader(
  file: PartitionedFile): PartitionReader[InternalRow]

buildReader is part of the FilePartitionReaderFactory abstraction.

buildReader determines a Hadoop RecordReader to use based on the enableVectorizedReader flag. When enabled, buildReader createVectorizedReader and createRowBaseReader otherwise.

In the end, buildReader creates a PartitionReaderWithPartitionValues (that is a PartitionReader with partition values appended).

Creating Row-Based RecordReader

createRowBaseReader(
  file: PartitionedFile): RecordReader[Void, InternalRow]

createRowBaseReader buildReaderBase for the given PartitionedFile and with createRowBaseParquetReader factory.

createRowBaseParquetReader

createRowBaseParquetReader(
  partitionValues: InternalRow,
  pushed: Option[FilterPredicate],
  convertTz: Option[ZoneId],
  datetimeRebaseSpec: RebaseSpec,
  int96RebaseSpec: RebaseSpec): RecordReader[Void, InternalRow]

createRowBaseParquetReader prints out the following DEBUG message to the logs:

Falling back to parquet-mr

createRowBaseParquetReader creates a ParquetReadSupport (with enableVectorizedReader flag disabled).

createRowBaseParquetReader creates a RecordReaderIterator with a new ParquetRecordReader.

In the end, createRowBaseParquetReader returns the ParquetRecordReader.

Creating Vectorized Parquet RecordReader

createVectorizedReader(
  file: PartitionedFile): VectorizedParquetRecordReader

createVectorizedReader buildReaderBase (for the given PartitionedFile and createParquetVectorizedReader).

In the end, createVectorizedReader requests the VectorizedParquetRecordReader to initBatch (with the partitionSchema and the partitionValues of the given PartitionedFile) and returns it.


createVectorizedReader is used when ParquetPartitionReaderFactory is requested for the following:

createParquetVectorizedReader

createParquetVectorizedReader(
  partitionValues: InternalRow,
  pushed: Option[FilterPredicate],
  convertTz: Option[ZoneId],
  datetimeRebaseSpec: RebaseSpec,
  int96RebaseSpec: RebaseSpec): VectorizedParquetRecordReader

createParquetVectorizedReader creates a VectorizedParquetRecordReader (with capacity).

createParquetVectorizedReader creates a RecordReaderIterator (for the VectorizedParquetRecordReader).

createParquetVectorizedReader prints out the following DEBUG message to the logs (with the partitionSchema and the given partitionValues):

Appending [partitionSchema] [partitionValues]

In the end, createParquetVectorizedReader returns the VectorizedParquetRecordReader.

Unused RecordReaderIterator?

It appears that the RecordReaderIterator is created but not used. Feeling confused.

buildReaderBase

buildReaderBase[T](
  file: PartitionedFile,
  buildReaderFunc: (
    FileSplit,
    InternalRow,
    TaskAttemptContextImpl,
    Option[FilterPredicate],
    Option[ZoneId],
    RebaseSpec,
    RebaseSpec) => RecordReader[Void, T]): RecordReader[Void, T]

buildReaderBase...FIXME


buildReaderBase is used when:

Logging

Enable ALL logging level for org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.ParquetPartitionReaderFactory.name = org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
logger.ParquetPartitionReaderFactory.level = all

Refer to Logging.