Skip to content

Demo: Change Data Feed

This demo shows Change Data Feed in action.

Create Delta Table with Change Data Feed Enabled

CREATE TABLE cdf_demo (id INT, name STRING)
USING delta
TBLPROPERTIES (delta.enableChangeDataFeed = true);

INSERT INTO

INSERT INTO cdf_demo VALUES (0, 'insert into');
SELECT * FROM cdf_demo;
+---+-----------+
|id |name       |
+---+-----------+
|0  |insert into|
+---+-----------+

UPDATE

UPDATE is among commands supported by Change Data Feed.

UPDATE cdf_demo SET name = 'update' WHERE id = 0;
SELECT * FROM cdf_demo;
+---+------+
|id |name  |
+---+------+
|0  |update|
+---+------+

_change_data

After executing the above UPDATE command, Delta Lake creates a _change_data directory (with cdc files).

$ tree spark-warehouse/cdf_demo
spark-warehouse/cdf_demo
├── _change_data
│   └── cdc-00000-d5a2730f-de81-4bc7-8bb1-b6c0ff5fec37.c000.snappy.parquet
├── _delta_log
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.json
│   └── 00000000000000000002.json
├── part-00000-088e28d1-b95f-46e2-812a-5389ae58af28-c000.snappy.parquet
└── part-00000-0947d1e8-a398-4e2c-8afe-db734b84f6b4.c000.snappy.parquet

2 directories, 6 files

CDC-Aware Batch Scan

val changes = spark
  .read
  .format("delta")
  .option("readChangeFeed", true)
  .option("startingVersion", "0")
  .table("delta_demo")
changes.show(truncate = false)
+---+-----------+----------------+---------------+-----------------------+
|id |name       |_change_type    |_commit_version|_commit_timestamp      |
+---+-----------+----------------+---------------+-----------------------+
|0  |insert into|update_preimage |2              |2022-07-24 18:23:42.102|
|0  |update     |update_postimage|2              |2022-07-24 18:23:42.102|
|0  |insert into|insert          |1              |2022-07-24 18:15:48.892|
+---+-----------+----------------+---------------+-----------------------+

CDC-Aware Streaming Query

spark
  .readStream
  .format("delta")
  .option("readChangeFeed", true)
  .table("delta_demo")
  .writeStream
  .format("console")
  .option("truncate", false)
  .queryName("Change feed from delta_demo")
  .start
spark.table("delta_demo").show
+---+
| id|
+---+
|  1|
|  0|
|  2|
|  3|
|  4|
+---+

Single Insert-Only Merge

sql("""
  MERGE INTO delta_demo target
  USING (VALUES 5 source(id))
  ON target.id = source.id
  WHEN NOT MATCHED THEN INSERT *;
""")

Streaming Micro-Batch

You should see the following output from the streaming query:

-------------------------------------------
Batch: 1
-------------------------------------------
+---+------------+---------------+-----------------------+
|id |_change_type|_commit_version|_commit_timestamp      |
+---+------------+---------------+-----------------------+
|5  |insert      |9              |2022-08-05 14:38:49.305|
+---+------------+---------------+-----------------------+

INSERT INTO and Streaming Query

sql("""
INSERT INTO delta_demo VALUES (6);
""")

You should see the following output from the streaming query.

-------------------------------------------
Batch: 1
-------------------------------------------
+---+------------+---------------+-----------------------+
|id |_change_type|_commit_version|_commit_timestamp      |
+---+------------+---------------+-----------------------+
|6  |insert      |10             |2022-08-05 16:35:08.657|
+---+------------+---------------+-----------------------+

Review and Merge

The following are loose notes (findings) while investigating CDF.

overwrite Save Mode

spark
  .range(5)
  .write
  .format("delta")
  .mode("overwrite")
  .save("/tmp/delta-xxx")
val startingVersion = 2
val v2 = spark
  .read
  .format("delta")
  .option("readChangeFeed", true)
  .option("startingVersion", startingVersion)
  .load("/tmp/delta-xxx")
v2.show(truncate = false)
+---+------------+---------------+-----------------------+
|id |_change_type|_commit_version|_commit_timestamp      |
+---+------------+---------------+-----------------------+
|0  |insert      |2              |2022-07-31 17:51:25.777|
|1  |insert      |2              |2022-07-31 17:51:25.777|
|2  |insert      |2              |2022-07-31 17:51:25.777|
|3  |insert      |2              |2022-07-31 17:51:25.777|
|4  |insert      |2              |2022-07-31 17:51:25.777|
|1  |delete      |2              |2022-07-31 17:51:25.777|
|0  |delete      |2              |2022-07-31 17:51:25.777|
|2  |delete      |2              |2022-07-31 17:51:25.777|
|3  |delete      |2              |2022-07-31 17:51:25.777|
|4  |delete      |2              |2022-07-31 17:51:25.777|
+---+------------+---------------+-----------------------+
scala> v2.orderBy('id).show(truncate = false)
+---+------------+---------------+-----------------------+
|id |_change_type|_commit_version|_commit_timestamp      |
+---+------------+---------------+-----------------------+
|0  |delete      |2              |2022-07-31 17:51:25.777|
|0  |insert      |2              |2022-07-31 17:51:25.777|
|1  |insert      |2              |2022-07-31 17:51:25.777|
|1  |delete      |2              |2022-07-31 17:51:25.777|
|2  |insert      |2              |2022-07-31 17:51:25.777|
|2  |delete      |2              |2022-07-31 17:51:25.777|
|3  |insert      |2              |2022-07-31 17:51:25.777|
|3  |delete      |2              |2022-07-31 17:51:25.777|
|4  |insert      |2              |2022-07-31 17:51:25.777|
|4  |delete      |2              |2022-07-31 17:51:25.777|
+---+------------+---------------+-----------------------+

DELETE FROM

sql("DELETE FROM delta.`/tmp/delta-xxx` WHERE id > 3").show
val descHistory = sql("desc history delta.`/tmp/delta-xxx`").select('version, 'operation, 'operationParameters)
scala> descHistory.show(truncate = false)
+-------+-----------------+-----------------------------------------------------------------+
|version|operation        |operationParameters                                              |
+-------+-----------------+-----------------------------------------------------------------+
|3      |DELETE           |{predicate -> ["(spark_catalog.delta.`/tmp/delta-xxx`.id > 3L)"]}|
|2      |WRITE            |{mode -> Overwrite, partitionBy -> []}                           |
|1      |SET TBLPROPERTIES|{properties -> {"delta.enableChangeDataFeed":"true"}}            |
|0      |WRITE            |{mode -> ErrorIfExists, partitionBy -> []}                       |
+-------+-----------------+-----------------------------------------------------------------+
val startingVersion = 3
val v3 = spark
  .read
  .format("delta")
  .option("readChangeFeed", true)
  .option("startingVersion", startingVersion)
  .load("/tmp/delta-xxx")
v3.orderBy('id).show(truncate = false)
+---+------------+---------------+-----------------------+
|id |_change_type|_commit_version|_commit_timestamp      |
+---+------------+---------------+-----------------------+
|4  |delete      |3              |2022-08-01 10:22:21.402|
+---+------------+---------------+-----------------------+

endingVersion Option

val startingVersion = 3
val endingVersion = 3
val v3 = spark
  .read
  .format("delta")
  .option("readChangeFeed", true)
  .option("startingVersion", startingVersion)
  .option("endingVersion", endingVersion)
  .load("/tmp/delta-xxx")
v3.orderBy('id).show(truncate = false)
+---+------------+---------------+-----------------------+
|id |_change_type|_commit_version|_commit_timestamp      |
+---+------------+---------------+-----------------------+
|4  |delete      |3              |2022-08-01 10:22:21.402|
+---+------------+---------------+-----------------------+