ParquetFileFormat¶
Obsolete
ParquetFileFormat
is a mere fallbackFileFormat of ParquetDataSourceV2.
ParquetFileFormat
is the FileFormat of Parquet Data Source.
ParquetFileFormat
is splitable.
ParquetFileFormat
is Serializable
.
Short Name¶
shortName(): String
ParquetFileFormat
is a DataSourceRegister with the short name:
parquet
Metadata Columns¶
FileFormat
metadataSchemaFields: Seq[StructField]
metadataSchemaFields
is part of the FileFormat abstraction.
metadataSchemaFields
is the following metadata columns:
- The default FileFormat-specific metadata columns
- row_index
row_index¶
row_index
is a ParquetFileFormat
-specific metadata column.
_tmp_metadata_row_index¶
isSplitable¶
FileFormat
isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean
isSplitable
is part of the FileFormat abstraction.
ParquetFileFormat
is splitable (true
).
Building Data Reader With Partition Values¶
FileFormat
buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow]
buildReaderWithPartitionValues
is part of the FileFormat abstraction.
Fixme
Review Me
buildReaderWithPartitionValues
sets the following configuration options in the input hadoopConf
.
Name | Value |
---|---|
parquet.read.support.class | ParquetReadSupport |
org.apache.spark.sql.parquet.row.requested_schema | JSON representation of requiredSchema |
org.apache.spark.sql.parquet.row.attributes | JSON representation of requiredSchema |
spark.sql.session.timeZone | spark.sql.session.timeZone |
spark.sql.parquet.binaryAsString | spark.sql.parquet.binaryAsString |
spark.sql.parquet.int96AsTimestamp | spark.sql.parquet.int96AsTimestamp |
buildReaderWithPartitionValues
requests ParquetWriteSupport
to setSchema
.
buildReaderWithPartitionValues
tries to push filters down to create a Parquet FilterPredicate
(aka pushed
).
With spark.sql.parquet.filterPushdown configuration property enabled, buildReaderWithPartitionValues
takes the input Spark data source filters
and converts them to Parquet filter predicates if possible. Otherwise, the Parquet filter predicate is not specified.
Note
buildReaderWithPartitionValues
creates filter predicates for the following types: BooleanType, IntegerType, (LongType, FloatType, DoubleType, StringType, BinaryType.
buildReaderWithPartitionValues
broadcasts the input hadoopConf
Hadoop Configuration
.
In the end, buildReaderWithPartitionValues
gives a function that takes a PartitionedFile and does the following:
-
Creates a Hadoop
FileSplit
for the inputPartitionedFile
-
Creates a Parquet
ParquetInputSplit
for the HadoopFileSplit
created -
Gets the broadcast Hadoop
Configuration
-
Creates a flag that says whether to apply timezone conversions to int96 timestamps or not (aka
convertTz
) -
Creates a Hadoop
TaskAttemptContextImpl
(with the broadcast HadoopConfiguration
and a HadoopTaskAttemptID
for a map task) -
Sets the Parquet
FilterPredicate
(only when spark.sql.parquet.filterPushdown configuration property is enabled and it is by default)
The function then branches off on whether Parquet vectorized reader is enabled or not.
With Parquet vectorized reader enabled, the function does the following:
-
Creates a VectorizedParquetRecordReader and a RecordReaderIterator
-
Requests
VectorizedParquetRecordReader
to initialize (with the ParquetParquetInputSplit
and the HadoopTaskAttemptContextImpl
) -
Prints out the following DEBUG message to the logs:
Appending [partitionSchema] [partitionValues]
-
Requests
VectorizedParquetRecordReader
to initBatch -
(only with supportBatch enabled) Requests
VectorizedParquetRecordReader
to enableReturningBatches -
In the end, the function gives the RecordReaderIterator (over the
VectorizedParquetRecordReader
) as theIterator[InternalRow]
With Parquet vectorized reader disabled, the function does the following:
- FIXME (since Parquet vectorized reader is enabled by default it's of less interest)
supportBatch¶
FileFormat
supportBatch(
sparkSession: SparkSession,
schema: StructType): Boolean
supportBatch
is part of the FileFormat abstraction.
Fixme
Review Me
supportBatch
supports vectorized parquet decoding in whole-stage code generation when the following all hold:
-
spark.sql.parquet.enableVectorizedReader configuration property is enabled
-
spark.sql.codegen.wholeStage internal configuration property is enabled
-
The number of fields in the schema is at most spark.sql.codegen.maxFields internal configuration property
-
All the fields in the output schema are of AtomicType
Vector Types¶
FileFormat
vectorTypes(
requiredSchema: StructType,
partitionSchema: StructType,
sqlConf: SQLConf): Option[Seq[String]]
vectorTypes
is part of the FileFormat abstraction.
Fixme
Review Me
vectorTypes
creates a collection of the names of OffHeapColumnVector or OnHeapColumnVector when spark.sql.columnVector.offheap.enabled property is enabled or disabled, respectively.
The size of the collection are all the fields of the given requiredSchema
and partitionSchema
schemas.
mergeSchemasInParallel¶
mergeSchemasInParallel(
parameters: Map[String, String],
filesToTouch: Seq[FileStatus],
sparkSession: SparkSession): Option[StructType]
mergeSchemasInParallel
mergeSchemasInParallel with the given filesToTouch
and a multi-threaded parquet footer reader.
FIXME
Describe the multi-threaded parquet footer reader.
Note
With the multi-threaded parquet footer reader, the whole mergeSchemasInParallel
is distributed (using RDD
while mergeSchemasInParallel) and multithreaded (per RDD partition).
mergeSchemasInParallel
is used when:
ParquetUtils
is requested to infer schema
Refactoring Needed?
mergeSchemasInParallel
should be moved to ParquetUtils
if that's the only place it's called from, huh?!
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.ParquetFileFormat.name = org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
logger.ParquetFileFormat.level = all
Refer to Logging.