SchemaMergingUtils¶
Asserting No Column Name Duplication¶
checkColumnNameDuplication(
schema: StructType,
colType: String): Unit
checkColumnNameDuplication
explodes the nested field names in the given schema (StructType
) and throws a DeltaAnalysisException
if there are duplicates.
Possible performance improvement
I think it's possible to make checkColumnNameDuplication
faster as it currently seems to do more than is really required to check for column duplication.
A schema is a tree so a duplication is when there are two nodes of the same name (lowercase) at any given level. If there is no duplicates at the highest level, there's no need to check duplicates down the tree.
colType
The name of colType
input argument is misleading and does not really say what it is for. It is used only for an error message to describe the operation that led to column duplication.
checkColumnNameDuplication
is used when:
DeltaLog
is requested to upgrade the protocolOptimisticTransactionImpl
is requested to verify a new metadata- AlterTableAddColumnsDeltaCommand and AlterTableReplaceColumnsDeltaCommand are executed
SchemaMergingUtils
utility is used to mergeSchemas
Demo¶
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.types._
val duplicatedCol = StructField("duplicatedCol", StringType)
val schema = (new StructType)
.add(duplicatedCol)
.add(duplicatedCol)
SchemaMergingUtils.checkColumnNameDuplication(schema, colType = "in the demo")
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the demo: duplicatedcol
at org.apache.spark.sql.delta.DeltaAnalysisException$.apply(DeltaSharedExceptions.scala:57)
at org.apache.spark.sql.delta.schema.SchemaMergingUtils$.checkColumnNameDuplication(SchemaMergingUtils.scala:117)
... 49 elided
explodeNestedFieldNames¶
explodeNestedFieldNames(
schema: StructType): Seq[String]
explodeNestedFieldNames
explodes the given schema into a collection of column names (name parts separated by .
).
explodeNestedFieldNames
is used when:
SchemaMergingUtils
utility is used to checkColumnNameDuplicationSchemaUtils
utility is used to normalizeColumnNames and checkSchemaFieldNames
Demo¶
import org.apache.spark.sql.types._
val m = MapType(keyType = LongType, valueType = StringType)
val s = (new StructType)
.add(StructField("id", LongType))
.add(StructField("name", StringType))
val a = ArrayType(elementType = (new StructType)
.add(StructField("id", LongType))
.add(StructField("name", StringType)))
val schema = (new StructType)
.add(StructField("l", LongType))
.add(StructField("s", s))
.add(StructField("a", a))
.add(StructField("m", m))
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
val colNames = SchemaMergingUtils.explodeNestedFieldNames(schema)
colNames.foreach(println)
l
s
s.id
s.name
a
a.element.id
a.element.name
m
Exploding Schema¶
explode(
schema: StructType): Seq[(Seq[String], StructField)]
explode
explodes the given schema (StructType
) into a collection of pairs of column name parts and the associated StructField
. The nested fields (StructType
s, ArrayType
s and MapType
s) are flattened out.
explode
is used when:
DeltaColumnMappingBase
is requested to getPhysicalNameFieldMapSchemaMergingUtils
utility is used to explodeNestedFieldNames
Demo¶
FIXME
Move the examples to Delta Lake (as unit tests).
MapType¶
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.types._
val m = MapType(keyType = LongType, valueType = StringType)
val schemaWithMap = (new StructType).add(StructField("m", m))
val r = SchemaMergingUtils.explode(schemaWithMap)
r.foreach(println)
(List(m),StructField(m,MapType(LongType,StringType,true),true))
r.map { case (ns, f) => s"${ns.mkString} -> ${f.dataType.sql}" }.foreach(println)
m -> MAP<BIGINT, STRING>
ArrayType¶
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.types._
val idName = (new StructType)
.add(StructField("id", LongType))
.add(StructField("name", StringType))
val a = ArrayType(elementType = idName)
val schemaWithArray = (new StructType).add(StructField("a", a))
val r = SchemaMergingUtils.explode(schemaWithArray)
r.foreach(println)
(List(a),StructField(a,ArrayType(StructType(StructField(id,LongType,true), StructField(name,StringType,true)),true),true))
(List(a, element, id),StructField(id,LongType,true))
(List(a, element, name),StructField(name,StringType,true))
r.map { case (ns, f) => s"${ns.mkString} -> ${f.dataType.sql}" }.foreach(println)
a -> ARRAY<STRUCT<`id`: BIGINT, `name`: STRING>>
aelementid -> BIGINT
aelementname -> STRING
StructType¶
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.types._
val s = (new StructType)
.add(StructField("id", LongType))
.add(StructField("name", StringType))
val schemaWithStructType = (new StructType).add(StructField("s", s))
val r = SchemaMergingUtils.explode(schemaWithStructType)
r.foreach(println)
(List(s),StructField(s,StructType(StructField(id,LongType,true), StructField(name,StringType,true)),true))
(List(s, id),StructField(id,LongType,true))
(List(s, name),StructField(name,StringType,true))
r.map { case (ns, f) => s"${ns.mkString} -> ${f.dataType.sql}" }.foreach(println)
s -> STRUCT<`id`: BIGINT, `name`: STRING>
sid -> BIGINT
sname -> STRING
Complex Schema¶
import org.apache.spark.sql.types._
val m = MapType(keyType = LongType, valueType = StringType)
val s = (new StructType)
.add(StructField("id", LongType))
.add(StructField("name", StringType))
val a = ArrayType(elementType = (new StructType)
.add(StructField("id", LongType))
.add(StructField("name", StringType)))
val schema = (new StructType)
.add(StructField("l", LongType))
.add(StructField("s", s))
.add(StructField("a", a))
.add(StructField("m", m))
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
val r = SchemaMergingUtils.explode(schema)
r.foreach(println)
(List(l),StructField(l,LongType,true))
(List(s),StructField(s,StructType(StructField(id,LongType,true), StructField(name,StringType,true)),true))
(List(s, id),StructField(id,LongType,true))
(List(s, name),StructField(name,StringType,true))
(List(a),StructField(a,ArrayType(StructType(StructField(id,LongType,true), StructField(name,StringType,true)),true),true))
(List(a, element, id),StructField(id,LongType,true))
(List(a, element, name),StructField(name,StringType,true))
(List(m),StructField(m,MapType(LongType,StringType,true),true))
r.map { case (ns, f) => s"${ns.mkString} -> ${f.dataType.sql}" }.foreach(println)
l -> BIGINT
s -> STRUCT<`id`: BIGINT, `name`: STRING>
sid -> BIGINT
sname -> STRING
a -> ARRAY<STRUCT<`id`: BIGINT, `name`: STRING>>
aelementid -> BIGINT
aelementname -> STRING
m -> MAP<BIGINT, STRING>