Demo: Hive Partitioned Parquet Table and Partition Pruning

The demo shows partition pruning optimization in Spark SQL for Hive partitioned tables in parquet format.

NOTE: The demo is a follow-up to Demo: Connecting Spark SQL to Hive Metastore (with Remote Metastore Server). Please finish it first before this demo.

The demo features the following:

Create Hive Partitioned Table in Parquet Format

Create a Hive partitioned table in parquet format with some data.

CREATE TABLE hive_partitioned_table
(id BIGINT, name STRING)
COMMENT 'Demo: Hive Partitioned Parquet Table and Partition Pruning'

INSERT INTO hive_partitioned_table
PARTITION (city="Warsaw")
VALUES (0, 'Jacek');

INSERT INTO hive_partitioned_table
PARTITION (city="Paris")
VALUES (1, 'Agata');

Accessing Hive Table in Spark Shell

Make sure that the table is accessible in Spark SQL.


val tableName = "hive_partitioned_table"
assert(spark.table(tableName).collect.length == 2 /* rows */)

// Use the default value of spark.sql.hive.convertMetastoreParquet

Query the available partitions.

val parts = spark
  .listPartitions("default", tableName)
scala> parts.foreach(println)
    Partition Values: [city=Warsaw]
    Location: hdfs://localhost:9000/user/hive/warehouse/hive_partitioned_table/city=Warsaw
    Serde Library:
    Storage Properties: [serialization.format=1]
    Partition Parameters: {rawDataSize=2, numFiles=1, transient_lastDdlTime=1585478385, totalSize=345, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true"}, numRows=1}
    Created Time: Sun Mar 29 12:39:45 CEST 2020
    Last Access: Thu Jan 01 01:00:00 CET 1970
    Partition Statistics: 345 bytes, 1 rows)
    Partition Values: [city=Paris]
    Location: hdfs://localhost:9000/user/hive/warehouse/hive_partitioned_table/city=Paris
    Serde Library:
    Storage Properties: [serialization.format=1]
    Partition Parameters: {rawDataSize=2, numFiles=1, transient_lastDdlTime=1585478387, totalSize=345, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true"}, numRows=1}
    Created Time: Sun Mar 29 12:39:47 CEST 2020
    Last Access: Thu Jan 01 01:00:00 CET 1970
    Partition Statistics: 345 bytes, 1 rows)

Create another Hive table using Spark.


Check out the table in Hive using beeline.

0: jdbc:hive2://localhost:10000> desc formatted cities;
|           col_name            |                     data_type                      |                      comment                       |
| # col_name                    | data_type                                          | comment                                            |
|                               | NULL                                               | NULL                                               |
| name                          | string                                             |                                                    |
|                               | NULL                                               | NULL                                               |
| # Detailed Table Information  | NULL                                               | NULL                                               |
| Database:                     | default                                            | NULL                                               |
| Owner:                        | jacek                                              | NULL                                               |
| CreateTime:                   | Sun Mar 29 14:39:40 CEST 2020                      | NULL                                               |
| LastAccessTime:               | UNKNOWN                                            | NULL                                               |
| Retention:                    | 0                                                  | NULL                                               |
| Location:                     | hdfs://localhost:9000/user/hive/warehouse/cities   | NULL                                               |
| Table Type:                   | MANAGED_TABLE                                      | NULL                                               |
| Table Parameters:             | NULL                                               | NULL                                               |
|                               | numFiles                                           | 1                                                  |
|                               | spark.sql.create.version                           | 2.4.5                                              |
|                               | spark.sql.sources.provider                         | parquet                                            |
|                               | spark.sql.sources.schema.numParts                  | 1                                                  |
|                               | spark.sql.sources.schema.part.0                    | {\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]} |
|                               | totalSize                                          | 425                                                |
|                               | transient_lastDdlTime                              | 1585485580                                         |
|                               | NULL                                               | NULL                                               |
| # Storage Information         | NULL                                               | NULL                                               |
| SerDe Library:                | | NULL                                               |
| InputFormat:                  | | NULL                                               |
| OutputFormat:                 | | NULL                                               |
| Compressed:                   | No                                                 | NULL                                               |
| Num Buckets:                  | -1                                                 | NULL                                               |
| Bucket Columns:               | []                                                 | NULL                                               |
| Sort Columns:                 | []                                                 | NULL                                               |
| Storage Desc Params:          | NULL                                               | NULL                                               |
|                               | path                                               | hdfs://localhost:9000/user/hive/warehouse/cities   |
|                               | serialization.format                               | 1                                                  |
32 rows selected (0.075 seconds)

Explore Partition Pruning

You'll be using ../[In] expression in structured queries to learn more on partition pruning.

TIP: Enable INFO logging level for ../[org.apache.spark.sql.execution.datasources.PrunedInMemoryFileIndex] logger.

Use a fixed list of cities to filter by (which should trigger partition pruning).

val q = sql(s"""SELECT * FROM $tableName WHERE city IN ('Warsaw')""")
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter 'city IN (Warsaw)
   +- 'UnresolvedRelation `hive_partitioned_table`

== Analyzed Logical Plan ==
id: bigint, name: string, city: string
Project [id#101L, name#102, city#103]
+- Filter city#103 IN (Warsaw)
   +- SubqueryAlias `default`.`hive_partitioned_table`
      +- Relation[id#101L,name#102,city#103] parquet

== Optimized Logical Plan ==
Project [id#101L, name#102, city#103]
+- Filter (isnotnull(city#103) && (city#103 = Warsaw))
   +- Relation[id#101L,name#102,city#103] parquet

== Physical Plan ==
*(1) FileScan parquet default.hive_partitioned_table[id#101L,name#102,city#103] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://localhost:9000/user/hive/warehouse/hive_partitioned_table/city=War..., PartitionCount: 1, PartitionFilters: [isnotnull(city#103), (city#103 = Warsaw)], PushedFilters: [], ReadSchema: struct<id:bigint,name:string>

Note the PartitionFilters field of the leaf FileScan node in the physical plan. It uses an ../[PrunedInMemoryFileIndex] (for the partition index). Let's explore it.

import org.apache.spark.sql.execution.FileSourceScanExec
val scan = q.queryExecution.executedPlan.collect { case op: FileSourceScanExec => op }.head

val index = scan.relation.location
scala> println(s"Time of partition metadata listing: ${index.metadataOpsTimeNs.get}ns")
Time of partition metadata listing: 41703540ns

// You may also want to review metadataTime metric in web UI
// Includes the above time and the time to list files

// You should see the following value (YMMV)
scala> println(scan.metrics("metadataTime").value)

Use a subquery to filter by and note the PartitionFilters field of FileScan operator (which is not supported for partition pruning since the values to filter partitions by are not known until the execution time).

val q = sql(s"""SELECT * FROM $tableName WHERE city IN (SELECT * FROM cities)""")
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter 'city IN (list#104 [])
   :  +- 'Project [*]
   :     +- 'UnresolvedRelation `cities`
   +- 'UnresolvedRelation `hive_partitioned_table`

== Analyzed Logical Plan ==
id: bigint, name: string, city: string
Project [id#113L, name#114, city#115]
+- Filter city#115 IN (list#104 [])
   :  +- Project [name#108]
   :     +- SubqueryAlias `default`.`cities`
   :        +- Relation[name#108] parquet
   +- SubqueryAlias `default`.`hive_partitioned_table`
      +- Relation[id#113L,name#114,city#115] parquet

== Optimized Logical Plan ==
Join LeftSemi, (city#115 = name#108)
:- Relation[id#113L,name#114,city#115] parquet
+- Relation[name#108] parquet

== Physical Plan ==
*(2) BroadcastHashJoin [city#115], [name#108], LeftSemi, BuildRight
:- *(2) FileScan parquet default.hive_partitioned_table[id#113L,name#114,city#115] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://localhost:9000/user/hive/warehouse/hive_partitioned_table], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,name:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
   +- *(1) FileScan parquet default.cities[name#108] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://localhost:9000/user/hive/warehouse/cities], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string>