ParquetFileFormat¶
Obsolete
ParquetFileFormat is a mere fallbackFileFormat of ParquetDataSourceV2.
ParquetFileFormat is the FileFormat of Parquet Data Source.
ParquetFileFormat is splitable.
ParquetFileFormat is Serializable.
Short Name¶
shortName(): String
ParquetFileFormat is a DataSourceRegister with the short name:
parquet
Metadata Columns¶
FileFormat
metadataSchemaFields: Seq[StructField]
metadataSchemaFields is part of the FileFormat abstraction.
metadataSchemaFields is the following metadata columns:
- The default FileFormat-specific metadata columns
- row_index
row_index¶
row_index is a ParquetFileFormat-specific metadata column.
_tmp_metadata_row_index¶
isSplitable¶
FileFormat
isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean
isSplitable is part of the FileFormat abstraction.
ParquetFileFormat is splitable (true).
Building Data Reader With Partition Values¶
FileFormat
buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow]
buildReaderWithPartitionValues is part of the FileFormat abstraction.
Fixme
Review Me
buildReaderWithPartitionValues sets the following configuration options in the input hadoopConf.
| Name | Value |
|---|---|
| parquet.read.support.class | ParquetReadSupport |
| org.apache.spark.sql.parquet.row.requested_schema | JSON representation of requiredSchema |
| org.apache.spark.sql.parquet.row.attributes | JSON representation of requiredSchema |
| spark.sql.session.timeZone | spark.sql.session.timeZone |
| spark.sql.parquet.binaryAsString | spark.sql.parquet.binaryAsString |
| spark.sql.parquet.int96AsTimestamp | spark.sql.parquet.int96AsTimestamp |
buildReaderWithPartitionValues requests ParquetWriteSupport to setSchema.
buildReaderWithPartitionValues tries to push filters down to create a Parquet FilterPredicate (aka pushed).
With spark.sql.parquet.filterPushdown configuration property enabled, buildReaderWithPartitionValues takes the input Spark data source filters and converts them to Parquet filter predicates if possible. Otherwise, the Parquet filter predicate is not specified.
Note
buildReaderWithPartitionValues creates filter predicates for the following types: BooleanType, IntegerType, (LongType, FloatType, DoubleType, StringType, BinaryType.
buildReaderWithPartitionValues broadcasts the input hadoopConf Hadoop Configuration.
In the end, buildReaderWithPartitionValues gives a function that takes a PartitionedFile and does the following:
-
Creates a Hadoop
FileSplitfor the inputPartitionedFile -
Creates a Parquet
ParquetInputSplitfor the HadoopFileSplitcreated -
Gets the broadcast Hadoop
Configuration -
Creates a flag that says whether to apply timezone conversions to int96 timestamps or not (aka
convertTz) -
Creates a Hadoop
TaskAttemptContextImpl(with the broadcast HadoopConfigurationand a HadoopTaskAttemptIDfor a map task) -
Sets the Parquet
FilterPredicate(only when spark.sql.parquet.filterPushdown configuration property is enabled and it is by default)
The function then branches off on whether Parquet vectorized reader is enabled or not.
With Parquet vectorized reader enabled, the function does the following:
-
Creates a VectorizedParquetRecordReader and a RecordReaderIterator
-
Requests
VectorizedParquetRecordReaderto initialize (with the ParquetParquetInputSplitand the HadoopTaskAttemptContextImpl) -
Prints out the following DEBUG message to the logs:
Appending [partitionSchema] [partitionValues] -
Requests
VectorizedParquetRecordReaderto initBatch -
(only with supportBatch enabled) Requests
VectorizedParquetRecordReaderto enableReturningBatches -
In the end, the function gives the RecordReaderIterator (over the
VectorizedParquetRecordReader) as theIterator[InternalRow]
With Parquet vectorized reader disabled, the function does the following:
- FIXME (since Parquet vectorized reader is enabled by default it's of less interest)
supportBatch¶
FileFormat
supportBatch(
sparkSession: SparkSession,
schema: StructType): Boolean
supportBatch is part of the FileFormat abstraction.
Fixme
Review Me
supportBatch supports vectorized parquet decoding in whole-stage code generation when the following all hold:
-
spark.sql.parquet.enableVectorizedReader configuration property is enabled
-
spark.sql.codegen.wholeStage internal configuration property is enabled
-
The number of fields in the schema is at most spark.sql.codegen.maxFields internal configuration property
-
All the fields in the output schema are of AtomicType
Vector Types¶
FileFormat
vectorTypes(
requiredSchema: StructType,
partitionSchema: StructType,
sqlConf: SQLConf): Option[Seq[String]]
vectorTypes is part of the FileFormat abstraction.
Fixme
Review Me
vectorTypes creates a collection of the names of OffHeapColumnVector or OnHeapColumnVector when spark.sql.columnVector.offheap.enabled property is enabled or disabled, respectively.
The size of the collection are all the fields of the given requiredSchema and partitionSchema schemas.
mergeSchemasInParallel¶
mergeSchemasInParallel(
parameters: Map[String, String],
filesToTouch: Seq[FileStatus],
sparkSession: SparkSession): Option[StructType]
mergeSchemasInParallel mergeSchemasInParallel with the given filesToTouch and a multi-threaded parquet footer reader.
FIXME
Describe the multi-threaded parquet footer reader.
Note
With the multi-threaded parquet footer reader, the whole mergeSchemasInParallel is distributed (using RDD while mergeSchemasInParallel) and multithreaded (per RDD partition).
mergeSchemasInParallel is used when:
ParquetUtilsis requested to infer schema
Refactoring Needed?
mergeSchemasInParallel should be moved to ParquetUtils if that's the only place it's called from, huh?!
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.ParquetFileFormat.name = org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
logger.ParquetFileFormat.level = all
Refer to Logging.