Skip to content

Demo: Connecting Spark SQL to Hive Metastore (with Remote Metastore Server)

The demo shows how to run Apache Spark 3.5.2 with Apache Hive 2.3.9 (on Apache Hadoop 2.10.0).

You'll be using a separate Remote Metastore Server to access table metadata via the Thrift protocol. It is in the discretion of the Remote Metastore Server to connect to the underlying JDBC-accessible relational database (e.g. PostgreSQL).

Tip

Read up External Apache Hive metastore in the official documentation of Databricks platform that describes the topic in more details from the perspective of Apache Spark developers.

Install Java 8

As per Hadoop's Hadoop Java Versions:

Apache Hadoop from 2.7.x to 2.x support Java 7 and 8

As per Spark's Downloading:

Spark runs on Java 8

Make sure you have Java 8 installed.

$ java -version
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_242-b08)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.242-b08, mixed mode)

Build Apache Spark for Apache Hadoop

Build Apache Spark with support for Apache Hadoop 2.10.0.

$ cd $SPARK_HOME
$ ./build/mvn \
    -Dhadoop.version=2.10.0 \
    -Pyarn,hive,hive-thriftserver \
    -Pscala-2.12 \
    -Pkubernetes \
    -DskipTests \
    clean install
$ ./bin/spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_242
Branch HEAD
Compiled by user centos on 2020-02-02T21:10:50Z
Revision cee4ecbb16917fa85f02c635925e2687400aa56b
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.

Assert the versions work in spark-shell before proceeding.

$ ./bin/spark-shell
scala> assert(spark.version == "2.4.5")

scala> assert(org.apache.hadoop.util.VersionInfo.getVersion == "2.10.0")

scala> assert(org.apache.hadoop.hive.shims.ShimLoader.getMajorVersion == "0.23")

Set Up Single-Node Hadoop Cluster

Hive uses Hadoop.

Download and install Hadoop 2.10.0 (or more recent stable release of Apache Hadoop 2 line if available).

export HADOOP_HOME=/Users/jacek/dev/apps/hadoop

Follow the official documentation in Hadoop: Setting up a Single Node Cluster to set up a single-node Hadoop installation.

$ $HADOOP_HOME/bin/hadoop version
Hadoop 2.10.0
Subversion ssh://git.corp.linkedin.com:29418/hadoop/hadoop.git -r e2f1f118e465e787d8567dfa6e2f3b72a0eb9194
Compiled by jhung on 2019-10-22T19:10Z
Compiled with protoc 2.5.0
From source with checksum 7b2d8877c5ce8c9a2cca5c7e81aa4026
This command was run using /Users/jacek/dev/apps/hadoop-2.10.0/share/hadoop/common/hadoop-common-2.10.0.jar

This demo assumes running a single-node in a pseudo-distributed mode where each Hadoop daemon runs in a separate Java process.

Tip

Use hadoop.tmp.dir configuration property as the base for temporary directories.

<property>
  <name>hadoop.tmp.dir</name>
  <value>/tmp/my-hadoop-tmp-dir/hdfs/tmp</value>
  <description>The base for temporary directories.</description>
</property>

Use ./bin/hdfs getconf -confKey hadoop.tmp.dir to check out the value

$ ./bin/hdfs getconf -confKey hadoop.tmp.dir
/tmp/my-hadoop-tmp-dir/hdfs/tmp

fs.defaultFS Configuration Property (core-site.xml)

Edit etc/hadoop/core-site.xml and define fs.defaultFS and hadoop.proxyuser. properties.

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
    <property>
      <name>hadoop.proxyuser.[username].groups</name>
      <value>*</value>
    </property>
    <property>
      <name>hadoop.proxyuser.[username].hosts</name>
      <value>*</value>
    </property>
</configuration>

Note

Replace [username] above with the local user (e.g. jacek) that will be used in beeline. Consult this question on StackOverflow.

dfs.replication Configuration Property (hdfs-site.xml)

Edit etc/hadoop/hdfs-site.xml and define dfs.replication property as follows:

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

Passphrase-less SSH (macOS)

Turn Remote Login on in Mac OS X's Sharing preferences that allow remote users to connect to a Mac using the OpenSSH protocols.

$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa_hadoop
$ cat ~/.ssh/id_rsa_hadoop.pub >> ~/.ssh/authorized_keys
$ chmod 0600 ~/.ssh/authorized_keys

Other Steps

You may want to set up JAVA_HOME in etc/hadoop/hadoop-env.sh as told in the file:

Quote

The only required environment variable is JAVA_HOME. All others are

optional. When running a distributed configuration it is best to

set JAVA_HOME in this file, so that it is correctly defined on

remote nodes.

$ $HADOOP_HOME/bin/hdfs namenode -format
...
INFO common.Storage: Storage directory /tmp/hadoop-jacek/dfs/name has been successfully formatted.
...

Note

Use ./bin/hdfs namenode to start a NameNode that will tell you that the local filesystem is not ready.

$ ./bin/hdfs namenode
18/01/09 15:43:11 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = japila.local/192.168.1.2
STARTUP_MSG:   args = []
STARTUP_MSG:   version = 2.7.5
...
18/01/09 15:43:11 INFO namenode.NameNode: fs.defaultFS is hdfs://localhost:9000
18/01/09 15:43:11 INFO namenode.NameNode: Clients are to use localhost:9000 to access this namenode/service.
...
18/01/09 15:43:12 INFO hdfs.DFSUtil: Starting Web-server for hdfs at: http://0.0.0.0:50070
...
18/01/09 15:43:13 WARN common.Storage: Storage directory /private/tmp/hadoop-jacek/dfs/name does not exist
18/01/09 15:43:13 WARN namenode.FSNamesystem: Encountered exception loading fsimage
org.apache.hadoop.hdfs.server.common.InconsistentFSStateException: Directory /private/tmp/hadoop-jacek/dfs/name is in an inconsistent state: storage directory does not exist or is not accessible.
  at org.apache.hadoop.hdfs.server.namenode.FSImage.recoverStorageDirs(FSImage.java:382)
  at org.apache.hadoop.hdfs.server.namenode.FSImage.recoverTransitionRead(FSImage.java:233)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.loadFSImage(FSNamesystem.java:984)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.loadFromDisk(FSNamesystem.java:686)
  at org.apache.hadoop.hdfs.server.namenode.NameNode.loadNamesystem(NameNode.java:586)
  at org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(NameNode.java:646)
  at org.apache.hadoop.hdfs.server.namenode.NameNode.<init>(NameNode.java:820)
  at org.apache.hadoop.hdfs.server.namenode.NameNode.<init>(NameNode.java:804)
  at org.apache.hadoop.hdfs.server.namenode.NameNode.createNameNode(NameNode.java:1516)
  at org.apache.hadoop.hdfs.server.namenode.NameNode.main(NameNode.java:1582)
...
18/01/09 15:43:13 ERROR namenode.NameNode: Failed to start namenode.
org.apache.hadoop.hdfs.server.common.InconsistentFSStateException: Directory /private/tmp/hadoop-jacek/dfs/name is in an inconsistent state: storage directory does not exist or is not accessible.
  at org.apache.hadoop.hdfs.server.namenode.FSImage.recoverStorageDirs(FSImage.java:382)
  at org.apache.hadoop.hdfs.server.namenode.FSImage.recoverTransitionRead(FSImage.java:233)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.loadFSImage(FSNamesystem.java:984)
  at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.loadFromDisk(FSNamesystem.java:686)
  at org.apache.hadoop.hdfs.server.namenode.NameNode.loadNamesystem(NameNode.java:586)
  at org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(NameNode.java:646)
  at org.apache.hadoop.hdfs.server.namenode.NameNode.<init>(NameNode.java:820)
  at org.apache.hadoop.hdfs.server.namenode.NameNode.<init>(NameNode.java:804)
  at org.apache.hadoop.hdfs.server.namenode.NameNode.createNameNode(NameNode.java:1516)
  at org.apache.hadoop.hdfs.server.namenode.NameNode.main(NameNode.java:1582)

Start Hadoop DFS using start-dfs.sh (and tail -f logs/hadoop-\*-datanode-*.log)

$ $HADOOP_HOME/sbin/start-dfs.sh
Starting namenodes on [localhost]
localhost: starting namenode, logging to /Users/jacek/dev/apps/hadoop-2.10.0/logs/hadoop-jacek-namenode-japila-new.local.out
localhost: starting datanode, logging to /Users/jacek/dev/apps/hadoop-2.10.0/logs/hadoop-jacek-datanode-japila-new.local.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /Users/jacek/dev/apps/hadoop-2.10.0/logs/hadoop-jacek-secondarynamenode-japila-new.local.out

List Hadoop's JVM processes using jps -lm.

$ jps -lm
50773 org.apache.hadoop.hdfs.server.datanode.DataNode
50870 org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode
50695 org.apache.hadoop.hdfs.server.namenode.NameNode

Note

FIXME Are the steps in YARN on a Single Node required for Hive?

Running Hive

Note

Following the steps in Running Hive.

$HADOOP_HOME/bin/hadoop fs -mkdir /tmp
$HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp
$HADOOP_HOME/bin/hadoop fs -mkdir -p /user/hive/warehouse
$HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse

Download and install Hive 2.3.9 (or more recent stable release of Apache Hive 2 line if available).

export HIVE_HOME=/Users/jacek/dev/apps/hive

Install PostgreSQL

You'll set up a remote metastore database (as https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration#AdminManualMetastoreAdministration-RemoteMetastoreDatabase[This configuration of metastore database is recommended for any real use.]) and you'll be using https://www.enterprisedb.com/downloads/postgres-postgresql-downloads[PostgreSQL 12.2].

$ pg_ctl -D /usr/local/var/postgres start
server started

Download the most current version of PostgreSQL JDBC Driver (e.g. PostgreSQL JDBC 4.2 Driver, 42.2.11). Save the jar file (postgresql-42.2.11.jar) in $HIVE_HOME/lib.

Setting Up Remote Metastore Database

Create a database and a user in PostgreSQL for Hive.

createdb hive_demo
createuser APP

Create conf/hive-site.xml (based on conf/hive-default.xml.template) with the following properties:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:postgresql://localhost:5432/hive_demo</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.postgresql.Driver</value>
  </property>
  <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>hdfs://localhost:9000/user/hive/warehouse</value>
  </property>
</configuration>

Use the Hive Schema Tool to create the metastore tables.

$ $HIVE_HOME/bin/schematool -dbType postgres -initSchema
Metastore connection URL:    jdbc:postgresql://localhost:5432/hive_demo
Metastore Connection Driver :    org.postgresql.Driver
Metastore connection User:   APP
Starting metastore schema initialization to 2.3.0
Initialization script hive-schema-2.3.0.postgres.sql
Initialization script completed
schemaTool completed
$ $HIVE_HOME/bin/schematool -dbType postgres -info
Metastore connection URL:    jdbc:postgresql://localhost:5432/hive_demo
Metastore Connection Driver :    org.postgresql.Driver
Metastore connection User:   APP
Hive distribution version:   2.3.0
Metastore schema version:    2.3.0
schemaTool completed

As per the official documentation of Hive:

HiveCLI is now deprecated in favor of Beeline

Run HiveServer2.

$HIVE_HOME/bin/hiveserver2

Run Beeline (the HiveServer2 CLI).

$ $HIVE_HOME/bin/beeline -u jdbc:hive2://localhost:10000
...
Connecting to jdbc:hive2://localhost:10000
Connected to: Apache Hive (version 2.3.6)
Driver: Hive JDBC (version 2.3.6)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 2.3.6 by Apache Hive
0: jdbc:hive2://localhost:10000>

Start Hive Metastore Server

Start the Hive Metastore Server (as described in Remote Metastore Server).

$HIVE_HOME/bin/hive --service metastore
...
Starting Hive Metastore Server

That is the server Spark SQL applications are going to connect to for metadata of Hive tables.

Connecting Apache Spark to Apache Hive

Create $SPARK_HOME/conf/hive-site.xml and define hive.metastore.uris configuration property (that is the thrift URL of the Hive Metastore Server).

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://localhost:9083</value>
  </property>
</configuration>

Optionally, you may want to add the following to conf/log4j2.properties for a more low-level logging:

log4j.logger.org.apache.spark.sql.hive.HiveUtils$=ALL
log4j.logger.org.apache.spark.sql.internal.SharedState=ALL
log4j.logger.org.apache.spark.sql.hive.client.HiveClientImpl=ALL

Start spark-shell.

$SPARK_HOME/bin/spark-shell \
  --jars \
    $HIVE_HOME/lib/hive-metastore-2.3.6.jar,\
    $HIVE_HOME/lib/hive-exec-2.3.6.jar,\
    $HIVE_HOME/lib/hive-common-2.3.6.jar,\
    $HIVE_HOME/lib/hive-serde-2.3.6.jar,\
    $HIVE_HOME/lib/guava-14.0.1.jar \
  --conf spark.sql.hive.metastore.version=2.3 \
  --conf spark.sql.hive.metastore.jars=$HIVE_HOME"/lib/*" \
  --conf spark.sql.warehouse.dir=hdfs://localhost:9000/user/hive/warehouse

You should see the following welcome message:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/

Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
Type in expressions to have them evaluated.
Type :help for more information.

With the scala> prompt you made sure that the spark.sql.hive.metastore.version and the JAR files are all correct (as the check happens while the SparkSession is created). Congratulations!

You may also want to check out the spark.sql.catalogImplementation internal property that should be hive. With the extra logging turned on, you should also see the configuration file loaded (hive-site.xml) and the warehouse location.

scala> spark.conf.get("spark.sql.catalogImplementation")
INFO SharedState: loading hive config file: file:/Users/jacek/dev/oss/spark/conf/hive-site.xml
INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('hdfs://localhost:9000/user/hive/warehouse').
INFO SharedState: Warehouse path is 'hdfs://localhost:9000/user/hive/warehouse'.
res0: String = hive

The most critical step is to check out the remote connection with the Hive Metastore Server (via the thrift protocol). Execute the following command to list all tables known to Spark SQL (incl. Hive tables if there were any, but there are none by default).

scala> spark.catalog.listTables.show
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+

There is one database in Hive by default.

0: jdbc:hive2://localhost:10000> show databases;
+----------------+
| database_name  |
+----------------+
| default        |
+----------------+
1 row selected (0.067 seconds)

List the tables in the default database. There should be some Hive tables listed.

scala> spark.sharedState.externalCatalog.listTables("default")

Create a partitioned table in Hive (based on the official documentation of Hive).

Execute the following DDL in beeline.

CREATE TABLE demo_sales
(id BIGINT, qty BIGINT, name STRING)
COMMENT 'Demo: Connecting Spark SQL to Hive Metastore'
PARTITIONED BY (rx_mth_cd STRING COMMENT 'Prescription Date YYYYMM aggregated')
STORED AS PARQUET;

CREATE TABLE ... USING hive

You can also create a Hive table from spark-shell using CREATE TABLE ... USING hive queries.

CREATE TABLE hive_table_name (id LONG) USING hive

In case of permission denied errors as the one below:

MetaException(message:Got exception: org.apache.hadoop.security.AccessControlException Permission denied: user=anonymous, access=WRITE, inode="/user/hive/warehouse":jacek:supergroup:drwxrwxr-x

you may want to simply change the permissions of the warehouse directory to allow anybody to write:

$HADOOP_HOME/bin/hadoop fs -chmod 777 /user/hive/warehouse
$HADOOP_HOME/bin/hadoop fs -ls /user/hive
Found 1 items
drwxrwxrwx   - jacek supergroup          0 2020-03-21 11:15 /user/hive/warehouse

Check out the table directory on HDFS.

$HADOOP_HOME/bin/hadoop fs -ls /user/hive/warehouse
Found 1 items
drwxrwxrwx   - anonymous supergroup          0 2020-03-22 16:07 /user/hive/warehouse/demo_sales

Insert some data.

# (id BIGINT, qty BIGINT, name STRING)
# PARTITIONED BY (rx_mth_cd STRING COMMENT 'Prescription Date YYYYMM aggregated')
INSERT INTO demo_sales PARTITION (rx_mth_cd="202002") VALUES (2, 2000, 'two');

Query the records in the table.

0: jdbc:hive2://localhost:10000> SELECT * FROM demo_sales;
+----------------+-----------------+------------------+-----------------------+
| demo_sales.id  | demo_sales.qty  | demo_sales.name  | demo_sales.rx_mth_cd  |
+----------------+-----------------+------------------+-----------------------+
| 2              | 2000            | two              | 202002                |
+----------------+-----------------+------------------+-----------------------+
1 row selected (0.112 seconds)

Display the partitions (there should really be one).

0: jdbc:hive2://localhost:10000> SHOW PARTITIONS demo_sales;
+-------------------+
|     partition     |
+-------------------+
| rx_mth_cd=202002  |
+-------------------+
1 row selected (0.084 seconds)

Check out the table directory on HDFS.

$ $HADOOP_HOME/bin/hadoop fs -ls -R /user/hive/warehouse/demo_sales
drwxrwxrwx   - anonymous supergroup          0 2020-03-22 16:10 /user/hive/warehouse/demo_sales/rx_mth_cd=202002
-rwxrwxrwx   1 anonymous supergroup        454 2020-03-22 16:10 /user/hive/warehouse/demo_sales/rx_mth_cd=202002/000000_0

Time for some Spark.

Query the tables in the default database. There should be at least the one you've just created.

scala> spark.sharedState.externalCatalog.listTables("default")
res6: Seq[String] = Buffer(demo_sales)

Query the rows in the table.

scala> spark.table("demo_sales").show
+---+----+----+---------+
| id| qty|name|rx_mth_cd|
+---+----+----+---------+
|  2|2000| two|   202002|
+---+----+----+---------+

Display the metadata of the table from the Spark catalog (DESCRIBE EXTENDED SQL command).

scala> sql("DESCRIBE EXTENDED demo_sales").show(Integer.MAX_VALUE, truncate = false)
+----------------------------+--------------------------------------------------------------+-----------------------------------+
|col_name                    |data_type                                                     |comment                            |
+----------------------------+--------------------------------------------------------------+-----------------------------------+
|id                          |bigint                                                        |null                               |
|qty                         |bigint                                                        |null                               |
|name                        |string                                                        |null                               |
|rx_mth_cd                   |string                                                        |Prescription Date YYYYMM aggregated|
|# Partition Information     |                                                              |                                   |
|# col_name                  |data_type                                                     |comment                            |
|rx_mth_cd                   |string                                                        |Prescription Date YYYYMM aggregated|
|                            |                                                              |                                   |
|# Detailed Table Information|                                                              |                                   |
|Database                    |default                                                       |                                   |
|Table                       |demo_sales                                                    |                                   |
|Owner                       |anonymous                                                     |                                   |
|Created Time                |Sun Mar 22 16:09:18 CET 2020                                  |                                   |
|Last Access                 |Thu Jan 01 01:00:00 CET 1970                                  |                                   |
|Created By                  |Spark 2.2 or prior                                            |                                   |
|Type                        |MANAGED                                                       |                                   |
|Provider                    |hive                                                          |                                   |
|Comment                     |Demo: Connecting Spark SQL to Hive Metastore                  |                                   |
|Table Properties            |[transient_lastDdlTime=1584889905]                            |                                   |
|Location                    |hdfs://localhost:9000/user/hive/warehouse/demo_sales          |                                   |
|Serde Library               |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe   |                                   |
|InputFormat                 |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat |                                   |
|OutputFormat                |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat|                                   |
|Storage Properties          |[serialization.format=1]                                      |                                   |
|Partition Provider          |Catalog                                                       |                                   |
+----------------------------+--------------------------------------------------------------+-----------------------------------+

It all worked fine. Congratulations!