Demo: Merge Operation¶
This demo shows DeltaTable.merge operation (and the underlying MergeIntoCommand) in action.
Tip
Enable ALL
logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand
logger as described in Logging.
Target Table¶
Create Table¶
val path = "/tmp/delta/demo"
import io.delta.tables.DeltaTable
val target = DeltaTable.create.addColumn("id", "long").location(path).execute
assert(target.isInstanceOf[io.delta.tables.DeltaTable])
assert(target.history.count == 1, "There must be version 0 only")
DROP TABLE IF EXISTS merge_demo;
CREATE TABLE merge_demo (id LONG)
USING delta;
Please note that the above commands leave us with an empty Delta table. Let's fix it.
import org.apache.spark.sql.SaveMode
spark.range(5).write.format("delta").mode(SaveMode.Append).save(path)
assert(target.history.count == 2)
INSERT INTO merge_demo
SELECT * FROM range(5);
Source Table¶
case class Person(id: Long, name: String)
val source = Seq(Person(0, "Zero"), Person(1, "One")).toDF
DROP TABLE IF EXISTS merge_demo_source;
CREATE TABLE merge_demo_source (id LONG, name STRING)
USING delta;
Note the difference in the schema of the target
and source
datasets.
target.toDF.printSchema
DESC merge_demo;
|-- id: long (nullable = true)
source.printSchema
DESC merge_demo_source;
root
|-- id: long (nullable = false)
|-- name: string (nullable = true)
MERGE MATCHED DELETE with Schema Evolution¶
Not only are we about to update the matching rows, but also update the schema (schema evolution).
val mergeBuilder = target.as("to")
.merge(
source = source.as("from"),
condition = $"to.id" === $"from.id")
assert(mergeBuilder.isInstanceOf[io.delta.tables.DeltaMergeBuilder])
scala> mergeBuilder.execute
org.apache.spark.sql.AnalysisException: There must be at least one WHEN clause in a MERGE statement
at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.apply(deltaMerge.scala:253)
at io.delta.tables.DeltaMergeBuilder.mergePlan(DeltaMergeBuilder.scala:268)
at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:215)
at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:87)
at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:73)
at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:120)
at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:204)
... 47 elided
val mergeMatchedBuilder = mergeBuilder.whenMatched()
assert(mergeMatchedBuilder.isInstanceOf[io.delta.tables.DeltaMergeMatchedActionBuilder])
val mergeBuilderDeleteMatched = mergeMatchedBuilder.delete()
assert(mergeBuilderDeleteMatched.isInstanceOf[io.delta.tables.DeltaMergeBuilder])
mergeBuilderDeleteMatched.execute()
MERGE INTO merge_demo to
USING merge_demo_source from
ON to.id = from.id;
Error in query:
There must be at least one WHEN clause in a MERGE statement(line 1, pos 0)
MERGE INTO merge_demo to
USING merge_demo_source from
ON to.id = from.id
WHEN MATCHED THEN DELETE;
assert(target.history.count == 3)
Update All Columns Except One¶
This demo shows how to update all columns except one on a match.
val targetDF = target
.toDF
.withColumn("num", lit(1))
.withColumn("updated", lit(false))
targetDF.sort('id.asc).show
+---+---+-------+
| id|num|updated|
+---+---+-------+
| 2| 1| false|
| 3| 1| false|
| 4| 1| false|
+---+---+-------+
Write the modified data out to the delta table (that will create a new version with the schema changed).
targetDF
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", true)
.save(path)
Reload the delta table (with the new column changes).
val target = DeltaTable.forPath(path)
val targetDF = target.toDF
val sourceDF = Seq(0, 1, 2).toDF("num")
Create an update map (with the columns of the target delta table and the new values).
val updates = Map(
"updated" -> lit(true))
target.as("to")
.merge(
source = sourceDF.as("from"),
condition = $"to.id" === $"from.num")
.whenMatched.update(updates)
.execute()
Reload the delta table (with the merge changes).
val target = DeltaTable.forPath(path)
target.toDF.sort('id.asc).show
+---+---+-------+
| id|num|updated|
+---+---+-------+
| 2| 1| true|
| 3| 1| false|
| 4| 1| false|
+---+---+-------+
assert(target.history.count == 5)
MERGE NOT MATCHED INSERT¶
Use spark-sql
to execute the following query.
MERGE INTO merge_demo to
USING merge_demo_source from
ON to.id = from.id
WHEN NOT MATCHED THEN INSERT *;
Streaming CDF Read and MERGE¶
Note
Until I figure out how to run two concurrent Spark applications using the same metastore I'm going to use spark-sql
and spark-shell
interchangeably.
Use spark-sql
to enable Change Data Feed on a delta table.
ALTER TABLE merge_demo
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
SHOW TBLPROPERTIES merge_demo;
Exit spark-sql
and open spark-shell
.
Run a streaming CDF scan over the delta table.
spark
.readStream
.format("delta")
.option("readChangeFeed", true)
.table("merge_demo")
.writeStream
.format("console")
.start
Execute MERGE
command and observe the output of the streaming query.
sql("""
MERGE INTO merge_demo to
USING merge_demo_source from
ON to.id = from.id
WHEN NOT MATCHED THEN INSERT *;
""").show(truncate = false)
-------------------------------------------
Batch: 1
-------------------------------------------
+---+------------+---------------+-----------------+
| id|_change_type|_commit_version|_commit_timestamp|
+---+------------+---------------+-----------------+
+---+------------+---------------+-----------------+
Why is the output empty?
Why is the batch even generated since there is no data?