Demo: Spark Connector

This demo shows how to set up a Spark application (namely spark-shell and pyspark) with Unity Catalog using the Spark Connector module.

Build Spark Connector

Build the Spark Connector module.

build/sbt '++2.13' clean package publishLocal

Scala 2.13

This demo uses the Scala 2.13 builds of Apache Spark, Delta Lake and Unity Catalog.

Unless you built Apache Spark with Scala 2.13, replace any 2.13 in this demo with 2.12.

Use spark-shell --version or pyspark --version to learn about your Spark build:

❯ ./bin/pyspark --version
Welcome to
    ____              __
    / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 3.5.2

Using Scala version 2.13.8, OpenJDK 64-Bit Server VM, 17.0.12
Branch heads/v3.5.2
Compiled by user jacek on 2024-08-19T17:24:05Z
Revision a26a80208d7c96b3e3e898f137a05d69cba759d6
Url <>
Type --help for more information.

Once the build is finished, you should find the following directories in the local ~/.ivy2 directory.

❯ ls -l ~/.ivy2/local/io.unitycatalog
total 0
drwxr-xr-x@ 3 jacek  staff  96 Sep 22 18:53 unitycatalog-client
drwxr-xr-x@ 3 jacek  staff  96 Sep 22 18:53 unitycatalog-server
drwxr-xr-x@ 3 jacek  staff  96 Sep 22 18:53 unitycatalog-spark_2.13

The Spark Connector module is under unitycatalog-spark_2.13/0.3.0-SNAPSHOT/jars directory.

Run Spark Application with Unity Catalog

./bin/spark-shell \
  --packages \,io.unitycatalog:unitycatalog-spark_2.13:0.3.0-SNAPSHOT \
  --conf \
  --conf spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog \
  --conf spark.sql.catalog.spark_catalog.uri=http://localhost:8080 \
  --conf spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog \
  --conf spark.sql.catalog.unity.uri=http://localhost:8080
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.2

Using Scala version 2.13.8 (OpenJDK 64-Bit Server VM, Java 17.0.12)
scala> assert(spark.version == "3.5.3")
scala> assert( == "3.2.1")
./bin/pyspark \
  --packages \,io.unitycatalog:unitycatalog-spark_2.13:0.3.0-SNAPSHOT \
  --conf \
  --conf spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog \
  --conf spark.sql.catalog.spark_catalog.uri=http://localhost:8080 \
  --conf spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog \
  --conf spark.sql.catalog.unity.uri=http://localhost:8080
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.2

Using Python version 3.9.16 (main, May 15 2023 18:51:40)

Download Apache Spark 4.0.0-preview1 from Preview release of Spark 4.0.

./bin/spark-shell \
  --conf spark.jars.ivy=$HOME/.ivy2 \
  --packages \,io.unitycatalog:unitycatalog-spark_2.13:0.3.0-SNAPSHOT \
  --conf \
  --conf spark.sql.catalog.spark_catalog=io.unitycatalog.connectors.spark.UCSingleCatalog \
  --conf spark.sql.catalog.spark_catalog.uri=http://localhost:8080 \
  --conf spark.sql.catalog.unity=io.unitycatalog.connectors.spark.UCSingleCatalog \
  --conf spark.sql.catalog.unity.uri=http://localhost:8080
Welcome to
      ____              __
    / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
  /___/ .__/\_,_/_/ /_/\_\   version 4.0.0-preview1

Using Scala version 2.13.14 (OpenJDK 64-Bit Server VM, Java 17.0.11)
assert(spark4.version == "4.0.0-preview1", "Unity Catalog 0.3.0-SNAPSHOT supports Apache Spark 4.0.0-preview1 only")
assert(io.delta4.version == "4.0.0rc1", "Unity Catalog 0.3.0-SNAPSHOT supports Delta Lake 4.0.0rc1 only")

Default spark_catalog

The following is a list of the catalogs Unity Catalog comes with and knows about.

$ ./bin/uc catalog list
│NAME │  COMMENT   │PROPERTIES│ CREATED_AT  │UPDATED_AT│                 ID                 │
│unity│Main catalog│{}        │1720429531248│null      │f68e0329-d510-4558-b984-457218f573d9│

There is no spark_catalog catalog listed (as it is the default internal Spark SQL catalog).

The following is a list of the catalogs Spark SQL knows about.

|         name|description|
|spark_catalog|       NULL|

There is the default spark_catalog catalog (but no Unity Catalog-specific unity catalog).

|name|      catalog|description|locationUri|
|demo|spark_catalog|       NULL|       NULL|
assert(spark.catalog.currentCatalog() == "spark_catalog")
scala> spark.catalog.listTables("default")
scala> spark.catalog.listTables("default")
io.unitycatalog.client.ApiException: listTables call failed with: 404 - {"error_code":"NOT_FOUND","details":[{"reason":"NOT_FOUND","metadata":{},"@type":"google.rpc.ErrorInfo"}],"stack_trace":null,"message":"Schema not found: default"}
  at io.unitycatalog.client.api.TablesApi.getApiException(
  at io.unitycatalog.client.api.TablesApi.listTablesWithHttpInfo(
  at io.unitycatalog.client.api.TablesApi.listTables(
  at io.unitycatalog.spark.UCProxy.listTables(UCSingleCatalog.scala:168)
  at org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension.listTables(
  at io.unitycatalog.spark.UCSingleCatalog.listTables(UCSingleCatalog.scala:52)

Let's create spark_catalog in Unity Catalog.

./bin/uc catalog create --name spark_catalog
./bin/uc schema create --catalog spark_catalog --name default
./bin/uc table create \
    --full_name spark_catalog.default.uc_demo \
    --columns 'id INT' \
    --storage_location /tmp/uc_demo \
    --format delta
❯ tree /tmp/uc_demo
└── _delta_log
    └── 00000000000000000000.json

2 directories, 1 file
scala> spark.catalog.listTables()
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)

== SQL ==


  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(parsers.scala:257)
  at org.apache.spark.sql.catalyst.parser.AbstractParser.parse(parsers.scala:98)
  at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:54)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(AbstractSqlParser.scala:54)
  at org.apache.spark.sql.internal.CatalogImpl.parseIdent(CatalogImpl.scala:51)
  at org.apache.spark.sql.internal.CatalogImpl.resolveNamespace(CatalogImpl.scala:427)
  at org.apache.spark.sql.internal.CatalogImpl.listTablesInternal(CatalogImpl.scala:143)
  at org.apache.spark.sql.internal.CatalogImpl.listTables(CatalogImpl.scala:127)
  at org.apache.spark.sql.internal.CatalogImpl.listTables(CatalogImpl.scala:118)
  ... 42 elided

Switch back to spark_catalog catalog.

sql("SET CATALOG spark_catalog")

We've registered an empty uc_demo table in Unity Catalog already.


It was equivalent to the following query:


Let's change (write to) the uc_demo delta table that was registered to Unity Catalog.

sql("INSERT INTO uc_demo(id) VALUES (1)").show(truncate=false)

It works! 🥳

Let's query (read from) the uc_demo delta table.

| id|
|  1|

unity Catalog

Unity Catalog is accessible using unity catalog ID.

sql("SET CATALOG unity")
|         name|description|
|spark_catalog|       NULL|
|        unity|       NULL|
|   name|catalog|   description|locationUri|
|default|  unity|Default schema|       NULL|
sql("DESC EXTENDED unity.default.numbers").show(truncate=false)
|col_name                    |data_type                                                                                |comment|
|as_int                      |int                                                                                      |NULL   |
|as_double                   |double                                                                                   |NULL   |
|                            |                                                                                         |       |
|# Detailed Table Information|                                                                                         |       |
|Name                        |unity.default.numbers                                                                    |       |
|Type                        |EXTERNAL                                                                                 |       |
|Location                    |file:/Users/jacek/dev/oss/unitycatalog/etc/data/external/unity/default/tables/numbers    |       |
|Provider                    |delta                                                                                    |       |
|Table Properties            |[delta.minReaderVersion=1,delta.minWriterVersion=2,option.key1=value1,option.key2=value2]|       |
./bin/uc table get --full_name unity.default.numbers
│              KEY              │                                                                    VALUE                                                                    │
│NAME                           │numbers                                                                                                                                      │
│CATALOG_NAME                   │unity                                                                                                                                        │
│SCHEMA_NAME                    │default                                                                                                                                      │
│TABLE_TYPE                     │EXTERNAL                                                                                                                                     │
│DATA_SOURCE_FORMAT             │DELTA                                                                                                                                        │
│COLUMNS                        │{"name":"as_int","type_text":"int","type_json":"{\"name\":\"as_int\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}","type_name":"I│
│                               │NT","type_precision":0,"type_scale":0,"type_interval_type":null,"position":0,"comment":"Int column","nullable":false,"partition_index":null} │
│                               │{"name":"as_double","type_text":"double","type_json":"{\"name\":\"as_double\",\"type\":\"double\",\"nullable\":false,\"metadata\":{}}","type_│
│                               │name":"DOUBLE","type_precision":0,"type_scale":0,"type_interval_type":null,"position":1,"comment":"Double                                    │
│                               │column","nullable":false,"partition_index":null}                                                                                             │
│STORAGE_LOCATION               │file:///Users/jacek/dev/oss/unitycatalog/etc/data/external/unity/default/tables/numbers/                                                     │
│COMMENT                        │External table                                                                                                                               │
│PROPERTIES                     │{"key1":"value1","key2":"value2"}                                                                                                            │
│OWNER                          │null                                                                                                                                         │
│CREATED_AT                     │1721234405617                                                                                                                                │
│CREATED_BY                     │null                                                                                                                                         │
│UPDATED_AT                     │1721234405617                                                                                                                                │
│UPDATED_BY                     │null                                                                                                                                         │
│TABLE_ID                       │32025924-be53-4d67-ac39-501a86046c01                                                                                                         │