Demo: Developing CatalogPlugin¶
The demo shows the internals of CatalogPlugin with support for TableCatalog and SupportsNamespaces.
Demo CatalogPlugin¶
Project Configuration¶
Use the following build.sbt
for the necessary dependencies.
name := "spark-sql-demo-catalog-plugin"
organization := "pl.japila.spark.sql"
version := "0.1"
scalaVersion := "2.12.12"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.1" % Provided
Code¶
Tip
Use :paste -raw
in spark-shell
to enter paste mode and paste the code (incl. the package declaration).
:paste -raw
package pl.japila.spark.sql
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util.{Map => JMap}
class DemoCatalog
extends CatalogPlugin
with TableCatalog
with SupportsNamespaces {
val Success = true
override def name(): String = DemoCatalog.NAME
override def defaultNamespace(): Array[String] = {
val ns = super.defaultNamespace()
println(s"defaultNamespace = ${ns.toSeq}")
ns
}
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
import scala.collection.JavaConverters._
println(s">>> initialize($name, ${options.asScala})")
}
override def listNamespaces(): Array[Array[String]] = {
println(">>> listNamespaces()")
Array.empty
}
override def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
println(s">>> listNamespaces($namespace)")
Array.empty
}
override def loadNamespaceMetadata(namespace: Array[String]): JMap[String, String] = {
println(s">>> loadNamespaceMetadata(${namespace.toSeq})")
import scala.collection.JavaConverters._
Map.empty[String, String].asJava
}
override def createNamespace(namespace: Array[String], metadata: JMap[String, String]): Unit = {
import scala.collection.JavaConverters._
println(s">>> createNamespace($namespace, ${metadata.asScala})")
}
override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
println(s">>> alterNamespace($namespace, $changes)")
}
override def dropNamespace(namespace: Array[String]): Boolean = {
println(s">>> dropNamespace($namespace)")
Success
}
override def listTables(namespace: Array[String]): Array[Identifier] = {
println(s">>> listTables(${namespace.toSeq})")
Array.empty
}
override def loadTable(ident: Identifier): Table = {
println(s">>> loadTable($ident)")
???
}
override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: JMap[String, String]): Table = {
import scala.collection.JavaConverters._
println(s">>> createTable($ident, $schema, $partitions, ${properties.asScala})")
???
}
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
println(s">>> alterTable($ident, $changes)")
???
}
override def dropTable(ident: Identifier): Boolean = {
println(s">>> dropTable($ident)")
Success
}
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
println(s">>> renameTable($oldIdent, $newIdent)")
}
override def toString = s"${this.getClass.getCanonicalName}($name)"
}
object DemoCatalog {
val NAME = "demo"
}
Configure SparkSession¶
Let's "install" this custom CatalogPlugin
in the current SparkSession.
There are various ways to do it and SET
SQL command is as fine as the others (for the demo at least).
sql("SET spark.sql.catalog.demo=pl.japila.spark.sql.DemoCatalog")
Show Time¶
Access Demo Catalog using CatalogManager¶
Let's use the CatalogManager to access the demo catalog.
val demo = spark.sessionState.catalogManager.catalog("demo")
scala> val demo = spark.sessionState.catalogManager.catalog("demo")
>>> initialize(demo, Map())
demo: org.apache.spark.sql.connector.catalog.CatalogPlugin = pl.japila.spark.sql.DemoCatalog(demo)
demo.defaultNamespace
scala> demo.defaultNamespace
defaultNamespace = WrappedArray()
res1: Array[String] = Array()
Show Tables¶
Let's use SHOW TABLES
SQL command to show the tables in the demo catalog.
sql("SHOW TABLES IN demo").show(truncate = false)
scala> sql("SHOW TABLES IN demo").show(truncate = false)
>>> initialize(demo, Map())
>>> listTables(WrappedArray())
+---------+---------+
|namespace|tableName|
+---------+---------+
+---------+---------+
Show Namespaces¶
Let's use SHOW NAMESPACES
SQL command to show the catalogs (incl. ours).
sql("SHOW NAMESPACES IN demo").show(truncate = false)
scala> sql("SHOW NAMESPACES IN demo").show(truncate = false)
>>> listNamespaces()
+---------+
|namespace|
+---------+
+---------+
Create Namespace¶
sql("CREATE NAMESPACE IF NOT EXISTS demo.hello").show(truncate = false)
scala> sql("CREATE NAMESPACE IF NOT EXISTS demo.hello").show(truncate = false)
>>> loadNamespaceMetadata(WrappedArray(hello))
++
||
++
++
Append Data to Table¶
spark.range(5).writeTo("demo.t1").append
scala> spark.range(5).writeTo("demo.t1").append
>>> loadTable(t1)
scala.NotImplementedError: an implementation is missing
at scala.Predef$.$qmark$qmark$qmark(Predef.scala:288)
at pl.japila.spark.sql.DemoCatalog.loadTable(<pastie>:67)
at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:283)
at org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:156)
... 47 elided
Possible Exceptions¶
Failed to get database¶
scala> spark.range(5).writeTo("demo.t1").append
20/12/28 20:01:30 WARN ObjectStore: Failed to get database demo, returning NoSuchObjectException
org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table demo.t1 not found;
at org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:162)
... 47 elided
Cannot find catalog plugin class¶
scala> spark.range(5).writeTo("demo.t1").append
org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'demo': pl.japila.spark.sql.DemoCatalog
at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:66)
at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52)
at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndIdentifier$.unapply(LookupCatalog.scala:128)
at org.apache.spark.sql.DataFrameWriterV2.<init>(DataFrameWriterV2.scala:52)
at org.apache.spark.sql.Dataset.writeTo(Dataset.scala:3359)
... 47 elided
Cannot use catalog demo: not a TableCatalog¶
scala> spark.range(5).writeTo("demo.t1").append
>>> initialize(demo, Map())
org.apache.spark.sql.AnalysisException: Cannot use catalog demo: not a TableCatalog;
at org.apache.spark.sql.connector.catalog.CatalogV2Implicits$CatalogHelper.asTableCatalog(CatalogV2Implicits.scala:76)
at org.apache.spark.sql.DataFrameWriterV2.<init>(DataFrameWriterV2.scala:53)
at org.apache.spark.sql.Dataset.writeTo(Dataset.scala:3359)
... 47 elided
Cannot use catalog demo: does not support namespaces¶
scala> sql("SHOW NAMESPACES IN demo").show(false)
org.apache.spark.sql.AnalysisException: Cannot use catalog demo: does not support namespaces;
at org.apache.spark.sql.connector.catalog.CatalogV2Implicits$CatalogHelper.asNamespaceCatalog(CatalogV2Implicits.scala:83)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:277)