Skip to content

Parquet Connector

Apache Parquet is a columnar storage format for the Apache Hadoop ecosystem with support for efficient storage and encoding of data.

Parquet Connector uses ParquetDataSourceV2 for parquet datasets and tables with ParquetScan for table scanning (reading) and ParquetWrite for data writing.

ParquetFileFormat is Fallback FileFormat

The older ParquetFileFormat is used as a fallbackFileFormat for backward-compatibility and Hive (to name a few use cases).

Parquet is the default connector format based on the spark.sql.sources.default configuration property.

Parquet connector uses spark.sql.parquet prefix for parquet-specific configuration properties.

Options

ParquetOptions

Configuration Properties

Reading

Schema Discovery (Inference)

Parquet Connector uses distributed and multi-threaded (concurrent) process for schema discovery.

Schema discovery can be configured using the following:

Vectorized Parquet Decoding

Parquet Connector uses VectorizedParquetRecordReader for Vectorized Parquet Decoding (and ParquetReadSupport otherwise).

Parquet CLI

parquet-cli is Apache Parquet's command-line tools and utilities

brew install parquet-cli
$ parquet help meta

Usage: parquet [general options] meta <parquet path> [command options]

  Description:

    Print a Parquet file's metadata
spark.range(0, 5, 1, numPartitions = 1)
  .write
  .mode("overwrite")
  .parquet("demo.parquet")
$ parquet meta demo.parquet/part-00000-9cb6054e-9986-4f04-8ae7-730aac93e7db-c000.snappy.parquet

File path:  demo.parquet/part-00000-9cb6054e-9986-4f04-8ae7-730aac93e7db-c000.snappy.parquet
Created by: parquet-mr version 1.12.3 (build f8dced182c4c1fbdec6ccb3185537b5a01e6ed6b)
Properties:
                   org.apache.spark.version: 3.4.0
  org.apache.spark.sql.parquet.row.metadata: {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]}
Schema:
message spark_schema {
  required int64 id;
}


Row group 0:  count: 5  10.60 B records  start: 4  total(compressed): 53 B total(uncompressed):63 B
--------------------------------------------------------------------------------
    type      encodings count     avg size   nulls   min / max
id  INT64     S   _     5         10.60 B    0       "0" / "4"

Demo

val p = spark.read.parquet("/tmp/nums.parquet")
scala> p.explain
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [id#3L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/nums.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
val executedPlan = p.queryExecution.executedPlan
scala> executedPlan.foreachUp { op => println(op.getClass) }
class org.apache.spark.sql.execution.FileSourceScanExec
class org.apache.spark.sql.execution.InputAdapter
class org.apache.spark.sql.execution.ColumnarToRowExec
class org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = executedPlan.collectFirst { case scan: FileSourceScanExec => scan }.get

import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
val parquetFF = scan.relation.fileFormat.asInstanceOf[ParquetFileFormat]