ParquetPartitionReaderFactory¶
ParquetPartitionReaderFactory is a FilePartitionReaderFactory (of ParquetScan) for batch queries in Parquet Connector.
Creating Instance¶
ParquetPartitionReaderFactory takes the following to be created:
- SQLConf
- Broadcast variable with a Hadoop Configuration
- Data schema
- Read data schema
- Partition schema
- Filters
- Aggregation
- ParquetOptions
ParquetPartitionReaderFactory is created when:
ParquetScanis requested for a PartitionReaderFactory
enableVectorizedReader¶
ParquetPartitionReaderFactory defines enableVectorizedReader internal flag to indicate whether isBatchReadSupported for the resultSchema or not.
enableVectorizedReader internal flag is used for the following:
- Indicate whether
ParquetPartitionReaderFactorysupportsColumnar - Creating a vectorized parquet RecordReader when requested for a PartitionReader
ParquetPartitionReaderFactory uses enableVectorizedReader flag to determine a Hadoop RecordReader to use when requested for a PartitionReader.
columnVector.offheap.enabled¶
ParquetPartitionReaderFactory uses spark.sql.columnVector.offheap.enabled configuration property when requested for the following:
- Create a Vectorized Reader (and create a VectorizedParquetRecordReader)
- Build a Columnar Reader (and
convertAggregatesRowToBatch)
supportColumnarReads¶
Signature
supportColumnarReads(
partition: InputPartition): Boolean
supportColumnarReads is part of the PartitionReaderFactory abstraction.
ParquetPartitionReaderFactory supports columnar reads when the following all hold:
- spark.sql.parquet.enableVectorizedReader is enabled
- spark.sql.codegen.wholeStage is enabled
- The number of the resultSchema fields is at most spark.sql.codegen.maxFields
Building Columnar PartitionReader¶
Signature
buildColumnarReader(
file: PartitionedFile): PartitionReader[ColumnarBatch]
buildColumnarReader is part of the FilePartitionReaderFactory abstraction.
buildColumnarReader createVectorizedReader (for the given PartitionedFile) and requests it to enableReturningBatches.
In the end, buildColumnarReader returns a PartitionReader that returns ColumnarBatches (when requested for records).
Building PartitionReader¶
Signature
buildReader(
file: PartitionedFile): PartitionReader[InternalRow]
buildReader is part of the FilePartitionReaderFactory abstraction.
buildReader determines a Hadoop RecordReader to use based on the enableVectorizedReader flag. When enabled, buildReader createVectorizedReader and createRowBaseReader otherwise.
In the end, buildReader creates a PartitionReaderWithPartitionValues (that is a PartitionReader with partition values appended).
Creating Row-Based RecordReader¶
createRowBaseReader(
file: PartitionedFile): RecordReader[Void, InternalRow]
createRowBaseReader buildReaderBase for the given PartitionedFile and with createRowBaseParquetReader factory.
createRowBaseParquetReader¶
createRowBaseParquetReader(
partitionValues: InternalRow,
pushed: Option[FilterPredicate],
convertTz: Option[ZoneId],
datetimeRebaseSpec: RebaseSpec,
int96RebaseSpec: RebaseSpec): RecordReader[Void, InternalRow]
createRowBaseParquetReader prints out the following DEBUG message to the logs:
Falling back to parquet-mr
createRowBaseParquetReader creates a ParquetReadSupport (with enableVectorizedReader flag disabled).
createRowBaseParquetReader creates a RecordReaderIterator with a new ParquetRecordReader.
In the end, createRowBaseParquetReader returns the ParquetRecordReader.
Creating Vectorized Parquet RecordReader¶
createVectorizedReader(
file: PartitionedFile): VectorizedParquetRecordReader
createVectorizedReader buildReaderBase (for the given PartitionedFile and createParquetVectorizedReader).
In the end, createVectorizedReader requests the VectorizedParquetRecordReader to initBatch (with the partitionSchema and the partitionValues of the given PartitionedFile) and returns it.
createVectorizedReader is used when ParquetPartitionReaderFactory is requested for the following:
- Build a partition reader (for a file) (with enableVectorizedReader enabled)
- Build a columnar partition reader (for a file)
createParquetVectorizedReader¶
createParquetVectorizedReader(
partitionValues: InternalRow,
pushed: Option[FilterPredicate],
convertTz: Option[ZoneId],
datetimeRebaseSpec: RebaseSpec,
int96RebaseSpec: RebaseSpec): VectorizedParquetRecordReader
createParquetVectorizedReader creates a VectorizedParquetRecordReader (with capacity).
createParquetVectorizedReader creates a RecordReaderIterator (for the VectorizedParquetRecordReader).
createParquetVectorizedReader prints out the following DEBUG message to the logs (with the partitionSchema and the given partitionValues):
Appending [partitionSchema] [partitionValues]
In the end, createParquetVectorizedReader returns the VectorizedParquetRecordReader.
Unused RecordReaderIterator?
It appears that the RecordReaderIterator is created but not used. Feeling confused.
buildReaderBase¶
buildReaderBase[T](
file: PartitionedFile,
buildReaderFunc: (
FileSplit,
InternalRow,
TaskAttemptContextImpl,
Option[FilterPredicate],
Option[ZoneId],
RebaseSpec,
RebaseSpec) => RecordReader[Void, T]): RecordReader[Void, T]
buildReaderBase...FIXME
buildReaderBase is used when:
ParquetPartitionReaderFactoryis requested to createRowBaseReader and createVectorizedReader
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.ParquetPartitionReaderFactory.name = org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
logger.ParquetPartitionReaderFactory.level = all
Refer to Logging.