Skip to content

CDF Table-Valued Functions

Delta Lake 2.3.0 comes with the following table-valued functions for Change Data Feed:

  • table_changes (table changes by delta table name)
  • table_changes_by_path
spark-sql> SHOW FUNCTIONS LIKE 'table_*';
table_changes
table_changes_by_path

spark-sql> DESC FUNCTION table_changes;
Function: table_changes
Class: org.apache.spark.sql.delta.CDCNameBased
Usage: N/A.

spark-sql> DESC FUNCTION table_changes_by_path;
Function: table_changes_by_path
Class: org.apache.spark.sql.delta.CDCPathBased
Usage: N/A.

The functions accepts two arguments for the time- or version range:

  • startingVersion or startingTimestamp
  • endingVersion or endingTimestamp

Learn More

Learn more about table-valued functions in The Internals of Spark SQL.

Internals

The CDF Table-Valued Functions are DeltaTableValueFunctions that are registered (injected) using DeltaSparkSessionExtension.

The CDF Table-Valued Functions are resolved (using DeltaAnalysis logical resolution rule) into LogicalRelations (Spark SQL) with the BaseRelations of the respective DeltaTableV2s with the following CDC Options:

  • startingVersion or startingTimestamp
  • endingVersion or endingTimestamp
  • readChangeFeed enabled

Demo

CREATE TABLE delta_demo (
  ID LONG,
  name STRING)
USING delta;
INSERT INTO delta_demo VALUES (0, "zero");
INSERT INTO delta_demo VALUES (1, "one");
DESC HISTORY delta_demo;
2   2023-03-26 19:14:47.895 NULL    NULL    WRITE   {"mode":"Append","partitionBy":"[]"}    NULL    NULL    NULL    1   Serializable    true    {"numFiles":"1","numOutputBytes":"705","numOutputRows":"1"} NULL    Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
1   2023-03-26 19:14:38.828 NULL    NULL    WRITE   {"mode":"Append","partitionBy":"[]"}    NULL    NULL    NULL    0   Serializable    true    {"numFiles":"1","numOutputBytes":"712","numOutputRows":"1"} NULL    Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
0   2023-03-26 19:14:06.261 NULL    NULL    CREATE TABLE    {"description":null,"isManaged":"true","partitionBy":"[]","properties":"{}"}    NULL    NULL    NULL    NULL    Serializabletrue    {}  NULL    Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
SELECT * FROM table_changes("delta_demo", 0, 2);
Error in query: Error getting change data for range [0 , 2] as change data was not
recorded for version [0]. If you've enabled change data feed on this table,
use `DESCRIBE HISTORY` to see when it was first enabled.
Otherwise, to start recording change data, use `ALTER TABLE table_name SET TBLPROPERTIES
(delta.enableChangeDataFeed=true)`.
ALTER TABLE delta_demo SET TBLPROPERTIES
(delta.enableChangeDataFeed=true);
INSERT INTO delta_demo VALUES (2, "two");
INSERT INTO delta_demo VALUES (3, "three");
DESC HISTORY delta_demo;
5   2023-03-26 19:53:51.526 NULL    NULL    WRITE   {"mode":"Append","partitionBy":"[]"}    NULL    NULL    NULL    4   Serializable    true    {"numFiles":"1","numOutputBytes":"719","numOutputRows":"1"} NULL    Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
4   2023-03-26 19:53:48.756 NULL    NULL    WRITE   {"mode":"Append","partitionBy":"[]"}    NULL    NULL    NULL    3   Serializable    true    {"numFiles":"1","numOutputBytes":"705","numOutputRows":"1"} NULL    Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
3   2023-03-26 19:48:42.872 NULL    NULL    SET TBLPROPERTIES   {"properties":"{"delta.enableChangeDataFeed":"true"}"}  NULL    NULL    NULL    2   Serializable    true    {}  NULL    Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
2   2023-03-26 19:14:47.895 NULL    NULL    WRITE   {"mode":"Append","partitionBy":"[]"}    NULL    NULL    NULL    1   Serializable    true    {"numFiles":"1","numOutputBytes":"705","numOutputRows":"1"} NULL    Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
1   2023-03-26 19:14:38.828 NULL    NULL    WRITE   {"mode":"Append","partitionBy":"[]"}    NULL    NULL    NULL    0   Serializable    true    {"numFiles":"1","numOutputBytes":"712","numOutputRows":"1"} NULL    Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
0   2023-03-26 19:14:06.261 NULL    NULL    CREATE TABLE    {"description":null,"isManaged":"true","partitionBy":"[]","properties":"{}"}    NULL    NULL    NULL    NULL    Serializabletrue    {}  NULL    Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
SELECT * FROM table_changes("delta_demo", 3, 5);
3   three   insert  5   2023-03-26 19:53:51.526
2   two     insert  4   2023-03-26 19:53:48.756