Skip to content

ParquetFileFormat

Obsolete

ParquetFileFormat is a mere fallbackFileFormat of ParquetDataSourceV2.

ParquetFileFormat is the FileFormat of Parquet Data Source.

ParquetFileFormat is splitable.

ParquetFileFormat is Serializable.

Short Name

shortName(): String

ParquetFileFormat is a DataSourceRegister with the short name:

parquet

Metadata Columns

FileFormat
metadataSchemaFields: Seq[StructField]

metadataSchemaFields is part of the FileFormat abstraction.

metadataSchemaFields is the following metadata columns:

row_index

row_index is a ParquetFileFormat-specific metadata column.

_tmp_metadata_row_index

isSplitable

FileFormat
isSplitable(
  sparkSession: SparkSession,
  options: Map[String, String],
  path: Path): Boolean

isSplitable is part of the FileFormat abstraction.

ParquetFileFormat is splitable (true).

Building Data Reader With Partition Values

FileFormat
buildReaderWithPartitionValues(
  sparkSession: SparkSession,
  dataSchema: StructType,
  partitionSchema: StructType,
  requiredSchema: StructType,
  filters: Seq[Filter],
  options: Map[String, String],
  hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow]

buildReaderWithPartitionValues is part of the FileFormat abstraction.

Fixme

Review Me

buildReaderWithPartitionValues sets the following configuration options in the input hadoopConf.

Name Value
parquet.read.support.class ParquetReadSupport
org.apache.spark.sql.parquet.row.requested_schema JSON representation of requiredSchema
org.apache.spark.sql.parquet.row.attributes JSON representation of requiredSchema
spark.sql.session.timeZone spark.sql.session.timeZone
spark.sql.parquet.binaryAsString spark.sql.parquet.binaryAsString
spark.sql.parquet.int96AsTimestamp spark.sql.parquet.int96AsTimestamp

buildReaderWithPartitionValues requests ParquetWriteSupport to setSchema.

buildReaderWithPartitionValues tries to push filters down to create a Parquet FilterPredicate (aka pushed).

With spark.sql.parquet.filterPushdown configuration property enabled, buildReaderWithPartitionValues takes the input Spark data source filters and converts them to Parquet filter predicates if possible. Otherwise, the Parquet filter predicate is not specified.

Note

buildReaderWithPartitionValues creates filter predicates for the following types: BooleanType, IntegerType, (LongType, FloatType, DoubleType, StringType, BinaryType.

buildReaderWithPartitionValues broadcasts the input hadoopConf Hadoop Configuration.

In the end, buildReaderWithPartitionValues gives a function that takes a PartitionedFile and does the following:

  1. Creates a Hadoop FileSplit for the input PartitionedFile

  2. Creates a Parquet ParquetInputSplit for the Hadoop FileSplit created

  3. Gets the broadcast Hadoop Configuration

  4. Creates a flag that says whether to apply timezone conversions to int96 timestamps or not (aka convertTz)

  5. Creates a Hadoop TaskAttemptContextImpl (with the broadcast Hadoop Configuration and a Hadoop TaskAttemptID for a map task)

  6. Sets the Parquet FilterPredicate (only when spark.sql.parquet.filterPushdown configuration property is enabled and it is by default)

The function then branches off on whether Parquet vectorized reader is enabled or not.

With Parquet vectorized reader enabled, the function does the following:

With Parquet vectorized reader disabled, the function does the following:

  • FIXME (since Parquet vectorized reader is enabled by default it's of less interest)

supportBatch

FileFormat
supportBatch(
  sparkSession: SparkSession,
  schema: StructType): Boolean

supportBatch is part of the FileFormat abstraction.

Fixme

Review Me

supportBatch supports vectorized parquet decoding in whole-stage code generation when the following all hold:

  1. spark.sql.parquet.enableVectorizedReader configuration property is enabled

  2. spark.sql.codegen.wholeStage internal configuration property is enabled

  3. The number of fields in the schema is at most spark.sql.codegen.maxFields internal configuration property

  4. All the fields in the output schema are of AtomicType

Vector Types

FileFormat
vectorTypes(
  requiredSchema: StructType,
  partitionSchema: StructType,
  sqlConf: SQLConf): Option[Seq[String]]

vectorTypes is part of the FileFormat abstraction.

Fixme

Review Me

vectorTypes creates a collection of the names of OffHeapColumnVector or OnHeapColumnVector when spark.sql.columnVector.offheap.enabled property is enabled or disabled, respectively.

The size of the collection are all the fields of the given requiredSchema and partitionSchema schemas.

mergeSchemasInParallel

mergeSchemasInParallel(
  parameters: Map[String, String],
  filesToTouch: Seq[FileStatus],
  sparkSession: SparkSession): Option[StructType]

mergeSchemasInParallel mergeSchemasInParallel with the given filesToTouch and a multi-threaded parquet footer reader.

FIXME

Describe the multi-threaded parquet footer reader.

Note

With the multi-threaded parquet footer reader, the whole mergeSchemasInParallel is distributed (using RDD while mergeSchemasInParallel) and multithreaded (per RDD partition).


mergeSchemasInParallel is used when:

Refactoring Needed?

mergeSchemasInParallel should be moved to ParquetUtils if that's the only place it's called from, huh?!

Logging

Enable ALL logging level for org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.ParquetFileFormat.name = org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
logger.ParquetFileFormat.level = all

Refer to Logging.