Demo: Change Data Feed¶
This demo shows Change Data Feed in action.
Create Delta Table with Change Data Feed Enabled¶
INSERT INTO¶
UPDATE¶
UPDATE is among commands supported by Change Data Feed.
_change_data¶
After executing the above UPDATE command, Delta Lake creates a _change_data directory (with cdc files).
$ tree spark-warehouse/cdf_demo
spark-warehouse/cdf_demo
├── _change_data
│ └── cdc-00000-d5a2730f-de81-4bc7-8bb1-b6c0ff5fec37.c000.snappy.parquet
├── _delta_log
│ ├── 00000000000000000000.json
│ ├── 00000000000000000001.json
│ └── 00000000000000000002.json
├── part-00000-088e28d1-b95f-46e2-812a-5389ae58af28-c000.snappy.parquet
└── part-00000-0947d1e8-a398-4e2c-8afe-db734b84f6b4.c000.snappy.parquet
2 directories, 6 files
CDC-Aware Batch Scan¶
val changes = spark
.read
.format("delta")
.option("readChangeFeed", true)
.option("startingVersion", "0")
.table("delta_demo")
+---+-----------+----------------+---------------+-----------------------+
|id |name |_change_type |_commit_version|_commit_timestamp |
+---+-----------+----------------+---------------+-----------------------+
|0 |insert into|update_preimage |2 |2022-07-24 18:23:42.102|
|0 |update |update_postimage|2 |2022-07-24 18:23:42.102|
|0 |insert into|insert |1 |2022-07-24 18:15:48.892|
+---+-----------+----------------+---------------+-----------------------+
CDC-Aware Streaming Query¶
spark
.readStream
.format("delta")
.option("readChangeFeed", true)
.table("delta_demo")
.writeStream
.format("console")
.option("truncate", false)
.queryName("Change feed from delta_demo")
.start
Single Insert-Only Merge¶
sql("""
MERGE INTO delta_demo target
USING (VALUES 5 source(id))
ON target.id = source.id
WHEN NOT MATCHED THEN INSERT *;
""")
Streaming Micro-Batch¶
You should see the following output from the streaming query:
-------------------------------------------
Batch: 1
-------------------------------------------
+---+------------+---------------+-----------------------+
|id |_change_type|_commit_version|_commit_timestamp |
+---+------------+---------------+-----------------------+
|5 |insert |9 |2022-08-05 14:38:49.305|
+---+------------+---------------+-----------------------+
INSERT INTO and Streaming Query¶
You should see the following output from the streaming query.
-------------------------------------------
Batch: 1
-------------------------------------------
+---+------------+---------------+-----------------------+
|id |_change_type|_commit_version|_commit_timestamp |
+---+------------+---------------+-----------------------+
|6 |insert |10 |2022-08-05 16:35:08.657|
+---+------------+---------------+-----------------------+
Review and Merge¶
The following are loose notes (findings) while investigating CDF.
overwrite Save Mode¶
val startingVersion = 2
val v2 = spark
.read
.format("delta")
.option("readChangeFeed", true)
.option("startingVersion", startingVersion)
.load("/tmp/delta-xxx")
v2.show(truncate = false)
+---+------------+---------------+-----------------------+
|id |_change_type|_commit_version|_commit_timestamp |
+---+------------+---------------+-----------------------+
|0 |insert |2 |2022-07-31 17:51:25.777|
|1 |insert |2 |2022-07-31 17:51:25.777|
|2 |insert |2 |2022-07-31 17:51:25.777|
|3 |insert |2 |2022-07-31 17:51:25.777|
|4 |insert |2 |2022-07-31 17:51:25.777|
|1 |delete |2 |2022-07-31 17:51:25.777|
|0 |delete |2 |2022-07-31 17:51:25.777|
|2 |delete |2 |2022-07-31 17:51:25.777|
|3 |delete |2 |2022-07-31 17:51:25.777|
|4 |delete |2 |2022-07-31 17:51:25.777|
+---+------------+---------------+-----------------------+
scala> v2.orderBy('id).show(truncate = false)
+---+------------+---------------+-----------------------+
|id |_change_type|_commit_version|_commit_timestamp |
+---+------------+---------------+-----------------------+
|0 |delete |2 |2022-07-31 17:51:25.777|
|0 |insert |2 |2022-07-31 17:51:25.777|
|1 |insert |2 |2022-07-31 17:51:25.777|
|1 |delete |2 |2022-07-31 17:51:25.777|
|2 |insert |2 |2022-07-31 17:51:25.777|
|2 |delete |2 |2022-07-31 17:51:25.777|
|3 |insert |2 |2022-07-31 17:51:25.777|
|3 |delete |2 |2022-07-31 17:51:25.777|
|4 |insert |2 |2022-07-31 17:51:25.777|
|4 |delete |2 |2022-07-31 17:51:25.777|
+---+------------+---------------+-----------------------+
DELETE FROM¶
val descHistory = sql("desc history delta.`/tmp/delta-xxx`").select('version, 'operation, 'operationParameters)
scala> descHistory.show(truncate = false)
+-------+-----------------+-----------------------------------------------------------------+
|version|operation |operationParameters |
+-------+-----------------+-----------------------------------------------------------------+
|3 |DELETE |{predicate -> ["(spark_catalog.delta.`/tmp/delta-xxx`.id > 3L)"]}|
|2 |WRITE |{mode -> Overwrite, partitionBy -> []} |
|1 |SET TBLPROPERTIES|{properties -> {"delta.enableChangeDataFeed":"true"}} |
|0 |WRITE |{mode -> ErrorIfExists, partitionBy -> []} |
+-------+-----------------+-----------------------------------------------------------------+
val startingVersion = 3
val v3 = spark
.read
.format("delta")
.option("readChangeFeed", true)
.option("startingVersion", startingVersion)
.load("/tmp/delta-xxx")
v3.orderBy('id).show(truncate = false)
+---+------------+---------------+-----------------------+
|id |_change_type|_commit_version|_commit_timestamp |
+---+------------+---------------+-----------------------+
|4 |delete |3 |2022-08-01 10:22:21.402|
+---+------------+---------------+-----------------------+