Skip to content

SchemaMergingUtils

Asserting No Column Name Duplication

checkColumnNameDuplication(
  schema: StructType,
  colType: String): Unit

checkColumnNameDuplication explodes the nested field names in the given schema (StructType) and throws a DeltaAnalysisException if there are duplicates.

Possible performance improvement

I think it's possible to make checkColumnNameDuplication faster as it currently seems to do more than is really required to check for column duplication.

A schema is a tree so a duplication is when there are two nodes of the same name (lowercase) at any given level. If there is no duplicates at the highest level, there's no need to check duplicates down the tree.

colType

The name of colType input argument is misleading and does not really say what it is for. It is used only for an error message to describe the operation that led to column duplication.

checkColumnNameDuplication is used when:

Demo

import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.types._
val duplicatedCol = StructField("duplicatedCol", StringType)
val schema = (new StructType)
  .add(duplicatedCol)
  .add(duplicatedCol)
SchemaMergingUtils.checkColumnNameDuplication(schema, colType = "in the demo")
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the demo: duplicatedcol
  at org.apache.spark.sql.delta.DeltaAnalysisException$.apply(DeltaSharedExceptions.scala:57)
  at org.apache.spark.sql.delta.schema.SchemaMergingUtils$.checkColumnNameDuplication(SchemaMergingUtils.scala:117)
  ... 49 elided

explodeNestedFieldNames

explodeNestedFieldNames(
  schema: StructType): Seq[String]

explodeNestedFieldNames explodes the given schema into a collection of column names (name parts separated by .).

explodeNestedFieldNames is used when:

Demo

import org.apache.spark.sql.types._
val m = MapType(keyType = LongType, valueType = StringType)
val s = (new StructType)
  .add(StructField("id", LongType))
  .add(StructField("name", StringType))
val a = ArrayType(elementType = (new StructType)
  .add(StructField("id", LongType))
  .add(StructField("name", StringType)))
val schema = (new StructType)
  .add(StructField("l", LongType))
  .add(StructField("s", s))
  .add(StructField("a", a))
  .add(StructField("m", m))
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
val colNames = SchemaMergingUtils.explodeNestedFieldNames(schema)
colNames.foreach(println)
l
s
s.id
s.name
a
a.element.id
a.element.name
m

Exploding Schema

explode(
  schema: StructType): Seq[(Seq[String], StructField)]

explode explodes the given schema (StructType) into a collection of pairs of column name parts and the associated StructField. The nested fields (StructTypes, ArrayTypes and MapTypes) are flattened out.

explode is used when:

Demo

FIXME

Move the examples to Delta Lake (as unit tests).

MapType

import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.types._

val m = MapType(keyType = LongType, valueType = StringType)
val schemaWithMap = (new StructType).add(StructField("m", m))
val r = SchemaMergingUtils.explode(schemaWithMap)
r.foreach(println)
(List(m),StructField(m,MapType(LongType,StringType,true),true))
r.map { case (ns, f) => s"${ns.mkString} -> ${f.dataType.sql}" }.foreach(println)
m -> MAP<BIGINT, STRING>

ArrayType

import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.types._

val idName = (new StructType)
  .add(StructField("id", LongType))
  .add(StructField("name", StringType))
val a = ArrayType(elementType = idName)
val schemaWithArray = (new StructType).add(StructField("a", a))
val r = SchemaMergingUtils.explode(schemaWithArray)
r.foreach(println)
(List(a),StructField(a,ArrayType(StructType(StructField(id,LongType,true), StructField(name,StringType,true)),true),true))
(List(a, element, id),StructField(id,LongType,true))
(List(a, element, name),StructField(name,StringType,true))
r.map { case (ns, f) => s"${ns.mkString} -> ${f.dataType.sql}" }.foreach(println)
a -> ARRAY<STRUCT<`id`: BIGINT, `name`: STRING>>
aelementid -> BIGINT
aelementname -> STRING

StructType

import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.types._

val s = (new StructType)
  .add(StructField("id", LongType))
  .add(StructField("name", StringType))
val schemaWithStructType = (new StructType).add(StructField("s", s))
val r = SchemaMergingUtils.explode(schemaWithStructType)
r.foreach(println)
(List(s),StructField(s,StructType(StructField(id,LongType,true), StructField(name,StringType,true)),true))
(List(s, id),StructField(id,LongType,true))
(List(s, name),StructField(name,StringType,true))
r.map { case (ns, f) => s"${ns.mkString} -> ${f.dataType.sql}" }.foreach(println)
s -> STRUCT<`id`: BIGINT, `name`: STRING>
sid -> BIGINT
sname -> STRING

Complex Schema

import org.apache.spark.sql.types._
val m = MapType(keyType = LongType, valueType = StringType)
val s = (new StructType)
  .add(StructField("id", LongType))
  .add(StructField("name", StringType))
val a = ArrayType(elementType = (new StructType)
  .add(StructField("id", LongType))
  .add(StructField("name", StringType)))
val schema = (new StructType)
  .add(StructField("l", LongType))
  .add(StructField("s", s))
  .add(StructField("a", a))
  .add(StructField("m", m))
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
val r = SchemaMergingUtils.explode(schema)
r.foreach(println)
(List(l),StructField(l,LongType,true))
(List(s),StructField(s,StructType(StructField(id,LongType,true), StructField(name,StringType,true)),true))
(List(s, id),StructField(id,LongType,true))
(List(s, name),StructField(name,StringType,true))
(List(a),StructField(a,ArrayType(StructType(StructField(id,LongType,true), StructField(name,StringType,true)),true),true))
(List(a, element, id),StructField(id,LongType,true))
(List(a, element, name),StructField(name,StringType,true))
(List(m),StructField(m,MapType(LongType,StringType,true),true))
r.map { case (ns, f) => s"${ns.mkString} -> ${f.dataType.sql}" }.foreach(println)
l -> BIGINT
s -> STRUCT<`id`: BIGINT, `name`: STRING>
sid -> BIGINT
sname -> STRING
a -> ARRAY<STRUCT<`id`: BIGINT, `name`: STRING>>
aelementid -> BIGINT
aelementname -> STRING
m -> MAP<BIGINT, STRING>