Demo: Merge Operation¶
This demo shows DeltaTable.merge operation (and the underlying MergeIntoCommand) in action.
Tip
Enable ALL logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand logger as described in Logging.
Target Table¶
Create Table¶
Please note that the above commands leave us with an empty Delta table. Let's fix it.
Source Table¶
Note the difference in the schema of the target and source datasets.
MERGE MATCHED DELETE with Schema Evolution¶
Not only are we about to update the matching rows, but also update the schema (schema evolution).
val mergeBuilder = target.as("to")
.merge(
source = source.as("from"),
condition = $"to.id" === $"from.id")
scala> mergeBuilder.execute
org.apache.spark.sql.AnalysisException: There must be at least one WHEN clause in a MERGE statement
at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.apply(deltaMerge.scala:253)
at io.delta.tables.DeltaMergeBuilder.mergePlan(DeltaMergeBuilder.scala:268)
at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:215)
at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:87)
at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:73)
at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:120)
at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:204)
... 47 elided
val mergeMatchedBuilder = mergeBuilder.whenMatched()
assert(mergeMatchedBuilder.isInstanceOf[io.delta.tables.DeltaMergeMatchedActionBuilder])
val mergeBuilderDeleteMatched = mergeMatchedBuilder.delete()
assert(mergeBuilderDeleteMatched.isInstanceOf[io.delta.tables.DeltaMergeBuilder])
mergeBuilderDeleteMatched.execute()
Update All Columns Except One¶
This demo shows how to update all columns except one on a match.
+---+---+-------+
| id|num|updated|
+---+---+-------+
| 2| 1| false|
| 3| 1| false|
| 4| 1| false|
+---+---+-------+
Write the modified data out to the delta table (that will create a new version with the schema changed).
Reload the delta table (with the new column changes).
Create an update map (with the columns of the target delta table and the new values).
target.as("to")
.merge(
source = sourceDF.as("from"),
condition = $"to.id" === $"from.num")
.whenMatched.update(updates)
.execute()
Reload the delta table (with the merge changes).
+---+---+-------+
| id|num|updated|
+---+---+-------+
| 2| 1| true|
| 3| 1| false|
| 4| 1| false|
+---+---+-------+
MERGE NOT MATCHED INSERT¶
Use spark-sql to execute the following query.
Streaming CDF Read and MERGE¶
Note
Until I figure out how to run two concurrent Spark applications using the same metastore I'm going to use spark-sql and spark-shell interchangeably.
Use spark-sql to enable Change Data Feed on a delta table.
Exit spark-sql and open spark-shell.
Run a streaming CDF scan over the delta table.
spark
.readStream
.format("delta")
.option("readChangeFeed", true)
.table("merge_demo")
.writeStream
.format("console")
.start
Execute MERGE command and observe the output of the streaming query.
sql("""
MERGE INTO merge_demo to
USING merge_demo_source from
ON to.id = from.id
WHEN NOT MATCHED THEN INSERT *;
""").show(truncate = false)
-------------------------------------------
Batch: 1
-------------------------------------------
+---+------------+---------------+-----------------+
| id|_change_type|_commit_version|_commit_timestamp|
+---+------------+---------------+-----------------+
+---+------------+---------------+-----------------+
Why is the output empty?
Why is the batch even generated since there is no data?