Skip to content

Demo: Developing CatalogPlugin

The demo shows the internals of CatalogPlugin with support for TableCatalog and SupportsNamespaces.

Demo CatalogPlugin

Find the sources of a demo CatalogPlugin in the GitHub repo.

Install Demo CatalogPlugin (Spark Shell)

./bin/spark-shell \
  --packages pl.japila.spark:spark-examples_2.13:1.0.0-SNAPSHOT \
  --conf spark.sql.catalog.demo=pl.japila.spark.sql.DemoCatalog \
  --conf spark.sql.catalog.demo.use-thing=true \
  --conf spark.sql.catalog.demo.delete-supported=false

SET

You could instead use the following at runtime:

sql("SET spark.sql.catalog.demo=pl.japila.spark.sql.DemoCatalog")

Show Time

sql("SHOW CATALOGS").show(truncate = false)
scala> sql("SET CATALOG demo")
>>> initialize(demo, Map(use-thing -> true, delete-supported -> false))
scala> sql("SHOW NAMESPACES IN demo_db").show(false)
defaultNamespace=<EMPTY>
>>> listNamespaces(namespace=ArraySeq(demo_db))
defaultNamespace=<EMPTY>
+---------+
|namespace|
+---------+
+---------+
-- FIXME Make it work
SELECT * FROM demo_db.demo_schema.demo_table LIMIT 10

Access Demo Catalog using CatalogManager

Let's use the CatalogManager to access the demo catalog.

val demo = spark.sessionState.catalogManager.catalog("demo")
scala> val demo = spark.sessionState.catalogManager.catalog("demo")
>>> initialize(demo, Map())
demo.defaultNamespace

Show Tables

Let's use SHOW TABLES SQL command to show the tables in the demo catalog.

scala> sql("SHOW TABLES IN demo").show(truncate = false)
defaultNamespace=<EMPTY>
>>> listTables(ArraySeq())
defaultNamespace=<EMPTY>
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+

Create Namespace

sql("CREATE NAMESPACE IF NOT EXISTS demo.hello").show(truncate = false)
scala> sql("CREATE NAMESPACE IF NOT EXISTS demo.hello").show(truncate = false)
>>> loadNamespaceMetadata(WrappedArray(hello))
++
||
++
++

Show Namespaces

Let's use SHOW NAMESPACES SQL command to show the catalogs (incl. ours).

sql("SHOW NAMESPACES IN demo").show(truncate = false)
scala> sql("SHOW NAMESPACES IN demo").show(truncate = false)
>>> listNamespaces()
+---------+
|namespace|
+---------+
+---------+

Append Data to Table

spark.range(5).writeTo("demo.t1").append
scala> spark.range(5).writeTo("demo.t1").append
>>> loadTable(t1)
scala.NotImplementedError: an implementation is missing
  at scala.Predef$.$qmark$qmark$qmark(Predef.scala:288)
  at pl.japila.spark.sql.DemoCatalog.loadTable(<pastie>:67)
  at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:283)
  at org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:156)
  ... 47 elided

Possible Exceptions

Failed to get database

scala> spark.range(5).writeTo("demo.t1").append
20/12/28 20:01:30 WARN ObjectStore: Failed to get database demo, returning NoSuchObjectException
org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table demo.t1 not found;
  at org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:162)
  ... 47 elided

Cannot find catalog plugin class

scala> spark.range(5).writeTo("demo.t1").append
org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'demo': pl.japila.spark.sql.DemoCatalog
  at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:66)
  at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52)
  at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
  at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52)
  at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndIdentifier$.unapply(LookupCatalog.scala:128)
  at org.apache.spark.sql.DataFrameWriterV2.<init>(DataFrameWriterV2.scala:52)
  at org.apache.spark.sql.Dataset.writeTo(Dataset.scala:3359)
  ... 47 elided

Cannot use catalog demo: not a TableCatalog

scala> spark.range(5).writeTo("demo.t1").append
>>> initialize(demo, Map())
org.apache.spark.sql.AnalysisException: Cannot use catalog demo: not a TableCatalog;
  at org.apache.spark.sql.connector.catalog.CatalogV2Implicits$CatalogHelper.asTableCatalog(CatalogV2Implicits.scala:76)
  at org.apache.spark.sql.DataFrameWriterV2.<init>(DataFrameWriterV2.scala:53)
  at org.apache.spark.sql.Dataset.writeTo(Dataset.scala:3359)
  ... 47 elided

Cannot use catalog demo: does not support namespaces

scala> sql("SHOW NAMESPACES IN demo").show(false)
org.apache.spark.sql.AnalysisException: Cannot use catalog demo: does not support namespaces;
  at org.apache.spark.sql.connector.catalog.CatalogV2Implicits$CatalogHelper.asNamespaceCatalog(CatalogV2Implicits.scala:83)
  at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:277)