Skip to content

Demo: Merge Operation

This demo shows DeltaTable.merge operation (and the underlying MergeIntoCommand) in action.

Tip

Enable ALL logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand logger as described in Logging.

Target Table

Create Table

val path = "/tmp/delta/demo"
import io.delta.tables.DeltaTable
val target = DeltaTable.create.addColumn("id", "long").location(path).execute
assert(target.isInstanceOf[io.delta.tables.DeltaTable])
assert(target.history.count == 1, "There must be version 0 only")
DROP TABLE IF EXISTS merge_demo;

CREATE TABLE merge_demo (id LONG)
USING delta;

Please note that the above commands leave us with an empty Delta table. Let's fix it.

import org.apache.spark.sql.SaveMode
spark.range(5).write.format("delta").mode(SaveMode.Append).save(path)
assert(target.history.count == 2)
INSERT INTO merge_demo
SELECT * FROM range(5);

Source Table

case class Person(id: Long, name: String)
val source = Seq(Person(0, "Zero"), Person(1, "One")).toDF
DROP TABLE IF EXISTS merge_demo_source;

CREATE TABLE merge_demo_source (id LONG, name STRING)
USING delta;

Note the difference in the schema of the target and source datasets.

target.toDF.printSchema
DESC merge_demo;
|-- id: long (nullable = true)
source.printSchema
DESC merge_demo_source;
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)

MERGE MATCHED DELETE with Schema Evolution

Not only are we about to update the matching rows, but also update the schema (schema evolution).

val mergeBuilder = target.as("to")
  .merge(
    source = source.as("from"),
    condition = $"to.id" === $"from.id")
assert(mergeBuilder.isInstanceOf[io.delta.tables.DeltaMergeBuilder])
scala> mergeBuilder.execute
org.apache.spark.sql.AnalysisException: There must be at least one WHEN clause in a MERGE statement
  at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.apply(deltaMerge.scala:253)
  at io.delta.tables.DeltaMergeBuilder.mergePlan(DeltaMergeBuilder.scala:268)
  at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:215)
  at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:87)
  at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:73)
  at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:120)
  at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:204)
  ... 47 elided
val mergeMatchedBuilder = mergeBuilder.whenMatched()
assert(mergeMatchedBuilder.isInstanceOf[io.delta.tables.DeltaMergeMatchedActionBuilder])

val mergeBuilderDeleteMatched = mergeMatchedBuilder.delete()
assert(mergeBuilderDeleteMatched.isInstanceOf[io.delta.tables.DeltaMergeBuilder])

mergeBuilderDeleteMatched.execute()
MERGE INTO merge_demo to
USING merge_demo_source from
ON to.id = from.id;
Error in query:
There must be at least one WHEN clause in a MERGE statement(line 1, pos 0)
MERGE INTO merge_demo to
USING merge_demo_source from
ON to.id = from.id
WHEN MATCHED THEN DELETE;
assert(target.history.count == 3)

Update All Columns Except One

This demo shows how to update all columns except one on a match.

val targetDF = target
  .toDF
  .withColumn("num", lit(1))
  .withColumn("updated", lit(false))
targetDF.sort('id.asc).show
+---+---+-------+
| id|num|updated|
+---+---+-------+
|  2|  1|  false|
|  3|  1|  false|
|  4|  1|  false|
+---+---+-------+

Write the modified data out to the delta table (that will create a new version with the schema changed).

targetDF
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", true)
  .save(path)

Reload the delta table (with the new column changes).

val target = DeltaTable.forPath(path)
val targetDF = target.toDF
val sourceDF = Seq(0, 1, 2).toDF("num")

Create an update map (with the columns of the target delta table and the new values).

val updates = Map(
  "updated" -> lit(true))
target.as("to")
  .merge(
    source = sourceDF.as("from"),
    condition = $"to.id" === $"from.num")
  .whenMatched.update(updates)
  .execute()

Reload the delta table (with the merge changes).

val target = DeltaTable.forPath(path)
target.toDF.sort('id.asc).show
+---+---+-------+
| id|num|updated|
+---+---+-------+
|  2|  1|   true|
|  3|  1|  false|
|  4|  1|  false|
+---+---+-------+
assert(target.history.count == 5)

MERGE NOT MATCHED INSERT

Use spark-sql to execute the following query.

MERGE INTO merge_demo to
USING merge_demo_source from
ON to.id = from.id
WHEN NOT MATCHED THEN INSERT *;

Streaming CDF Read and MERGE

Note

Until I figure out how to run two concurrent Spark applications using the same metastore I'm going to use spark-sql and spark-shell interchangeably.

Use spark-sql to enable Change Data Feed on a delta table.

ALTER TABLE merge_demo
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
SHOW TBLPROPERTIES merge_demo;

Exit spark-sql and open spark-shell.

Run a streaming CDF scan over the delta table.

spark
  .readStream
  .format("delta")
  .option("readChangeFeed", true)
  .table("merge_demo")
  .writeStream
  .format("console")
  .start

Execute MERGE command and observe the output of the streaming query.

sql("""
  MERGE INTO merge_demo to
  USING merge_demo_source from
  ON to.id = from.id
  WHEN NOT MATCHED THEN INSERT *;
""").show(truncate = false)
-------------------------------------------
Batch: 1
-------------------------------------------
+---+------------+---------------+-----------------+
| id|_change_type|_commit_version|_commit_timestamp|
+---+------------+---------------+-----------------+
+---+------------+---------------+-----------------+

Why is the output empty?

Why is the batch even generated since there is no data?