ParquetPartitionReaderFactory¶
ParquetPartitionReaderFactory
is a FilePartitionReaderFactory (of ParquetScan) for batch queries in Parquet Connector.
Creating Instance¶
ParquetPartitionReaderFactory
takes the following to be created:
- SQLConf
- Broadcast variable with a Hadoop Configuration
- Data schema
- Read data schema
- Partition schema
- Filters
- Aggregation
- ParquetOptions
ParquetPartitionReaderFactory
is created when:
ParquetScan
is requested for a PartitionReaderFactory
enableVectorizedReader¶
ParquetPartitionReaderFactory
defines enableVectorizedReader
internal flag to indicate whether isBatchReadSupported for the resultSchema or not.
enableVectorizedReader
internal flag is used for the following:
- Indicate whether
ParquetPartitionReaderFactory
supportsColumnar - Creating a vectorized parquet RecordReader when requested for a PartitionReader
ParquetPartitionReaderFactory
uses enableVectorizedReader
flag to determine a Hadoop RecordReader to use when requested for a PartitionReader.
columnVector.offheap.enabled¶
ParquetPartitionReaderFactory
uses spark.sql.columnVector.offheap.enabled configuration property when requested for the following:
- Create a Vectorized Reader (and create a VectorizedParquetRecordReader)
- Build a Columnar Reader (and
convertAggregatesRowToBatch
)
supportColumnarReads¶
Signature
supportColumnarReads(
partition: InputPartition): Boolean
supportColumnarReads
is part of the PartitionReaderFactory abstraction.
ParquetPartitionReaderFactory
supports columnar reads when the following all hold:
- spark.sql.parquet.enableVectorizedReader is enabled
- spark.sql.codegen.wholeStage is enabled
- The number of the resultSchema fields is at most spark.sql.codegen.maxFields
Building Columnar PartitionReader¶
Signature
buildColumnarReader(
file: PartitionedFile): PartitionReader[ColumnarBatch]
buildColumnarReader
is part of the FilePartitionReaderFactory abstraction.
buildColumnarReader
createVectorizedReader (for the given PartitionedFile) and requests it to enableReturningBatches.
In the end, buildColumnarReader
returns a PartitionReader that returns ColumnarBatches (when requested for records).
Building PartitionReader¶
Signature
buildReader(
file: PartitionedFile): PartitionReader[InternalRow]
buildReader
is part of the FilePartitionReaderFactory abstraction.
buildReader
determines a Hadoop RecordReader to use based on the enableVectorizedReader flag. When enabled, buildReader
createVectorizedReader and createRowBaseReader otherwise.
In the end, buildReader
creates a PartitionReaderWithPartitionValues
(that is a PartitionReader with partition values appended).
Creating Row-Based RecordReader¶
createRowBaseReader(
file: PartitionedFile): RecordReader[Void, InternalRow]
createRowBaseReader
buildReaderBase for the given PartitionedFile and with createRowBaseParquetReader factory.
createRowBaseParquetReader¶
createRowBaseParquetReader(
partitionValues: InternalRow,
pushed: Option[FilterPredicate],
convertTz: Option[ZoneId],
datetimeRebaseSpec: RebaseSpec,
int96RebaseSpec: RebaseSpec): RecordReader[Void, InternalRow]
createRowBaseParquetReader
prints out the following DEBUG message to the logs:
Falling back to parquet-mr
createRowBaseParquetReader
creates a ParquetReadSupport (with enableVectorizedReader flag disabled).
createRowBaseParquetReader
creates a RecordReaderIterator with a new ParquetRecordReader
.
In the end, createRowBaseParquetReader
returns the ParquetRecordReader
.
Creating Vectorized Parquet RecordReader¶
createVectorizedReader(
file: PartitionedFile): VectorizedParquetRecordReader
createVectorizedReader
buildReaderBase (for the given PartitionedFile and createParquetVectorizedReader).
In the end, createVectorizedReader
requests the VectorizedParquetRecordReader to initBatch (with the partitionSchema and the partitionValues of the given PartitionedFile) and returns it.
createVectorizedReader
is used when ParquetPartitionReaderFactory
is requested for the following:
- Build a partition reader (for a file) (with enableVectorizedReader enabled)
- Build a columnar partition reader (for a file)
createParquetVectorizedReader¶
createParquetVectorizedReader(
partitionValues: InternalRow,
pushed: Option[FilterPredicate],
convertTz: Option[ZoneId],
datetimeRebaseSpec: RebaseSpec,
int96RebaseSpec: RebaseSpec): VectorizedParquetRecordReader
createParquetVectorizedReader
creates a VectorizedParquetRecordReader (with capacity).
createParquetVectorizedReader
creates a RecordReaderIterator (for the VectorizedParquetRecordReader
).
createParquetVectorizedReader
prints out the following DEBUG message to the logs (with the partitionSchema and the given partitionValues
):
Appending [partitionSchema] [partitionValues]
In the end, createParquetVectorizedReader
returns the VectorizedParquetRecordReader
.
Unused RecordReaderIterator?
It appears that the RecordReaderIterator
is created but not used. Feeling confused.
buildReaderBase¶
buildReaderBase[T](
file: PartitionedFile,
buildReaderFunc: (
FileSplit,
InternalRow,
TaskAttemptContextImpl,
Option[FilterPredicate],
Option[ZoneId],
RebaseSpec,
RebaseSpec) => RecordReader[Void, T]): RecordReader[Void, T]
buildReaderBase
...FIXME
buildReaderBase
is used when:
ParquetPartitionReaderFactory
is requested to createRowBaseReader and createVectorizedReader
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.ParquetPartitionReaderFactory.name = org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
logger.ParquetPartitionReaderFactory.level = all
Refer to Logging.