Parquet Connector¶
Apache Parquet is a columnar storage format for the Apache Hadoop ecosystem with support for efficient storage and encoding of data.
Parquet Connector uses ParquetDataSourceV2 for parquet
datasets and tables with ParquetScan for table scanning (reading) and ParquetWrite for data writing.
ParquetFileFormat is Fallback FileFormat
The older ParquetFileFormat is used as a fallbackFileFormat for backward-compatibility and Hive (to name a few use cases).
Parquet is the default connector format based on the spark.sql.sources.default configuration property.
Parquet connector uses spark.sql.parquet
prefix for parquet-specific configuration properties.
Options¶
Configuration Properties¶
Reading¶
Schema Discovery (Inference)¶
Parquet Connector uses distributed and multi-threaded (concurrent) process for schema discovery.
Schema discovery can be configured using the following:
Vectorized Parquet Decoding¶
Parquet Connector uses VectorizedParquetRecordReader for Vectorized Parquet Decoding (and ParquetReadSupport otherwise).
Parquet CLI¶
parquet-cli is Apache Parquet's command-line tools and utilities
brew install parquet-cli
Print Parquet Metadata¶
$ parquet help meta
Usage: parquet [general options] meta <parquet path> [command options]
Description:
Print a Parquet file's metadata
spark.range(0, 5, 1, numPartitions = 1)
.write
.mode("overwrite")
.parquet("demo.parquet")
$ parquet meta demo.parquet/part-00000-9cb6054e-9986-4f04-8ae7-730aac93e7db-c000.snappy.parquet
File path: demo.parquet/part-00000-9cb6054e-9986-4f04-8ae7-730aac93e7db-c000.snappy.parquet
Created by: parquet-mr version 1.12.3 (build f8dced182c4c1fbdec6ccb3185537b5a01e6ed6b)
Properties:
org.apache.spark.version: 3.4.0
org.apache.spark.sql.parquet.row.metadata: {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]}
Schema:
message spark_schema {
required int64 id;
}
Row group 0: count: 5 10.60 B records start: 4 total(compressed): 53 B total(uncompressed):63 B
--------------------------------------------------------------------------------
type encodings count avg size nulls min / max
id INT64 S _ 5 10.60 B 0 "0" / "4"
Demo¶
val p = spark.read.parquet("/tmp/nums.parquet")
scala> p.explain
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [id#3L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/nums.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
val executedPlan = p.queryExecution.executedPlan
scala> executedPlan.foreachUp { op => println(op.getClass) }
class org.apache.spark.sql.execution.FileSourceScanExec
class org.apache.spark.sql.execution.InputAdapter
class org.apache.spark.sql.execution.ColumnarToRowExec
class org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = executedPlan.collectFirst { case scan: FileSourceScanExec => scan }.get
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
val parquetFF = scan.relation.fileFormat.asInstanceOf[ParquetFileFormat]