CDF Table-Valued Functions¶
Delta Lake 2.3.0 comes with the following table-valued functions for Change Data Feed:
table_changes
(table changes by delta table name)table_changes_by_path
spark-sql> SHOW FUNCTIONS LIKE 'table_*';
table_changes
table_changes_by_path
spark-sql> DESC FUNCTION table_changes;
Function: table_changes
Class: org.apache.spark.sql.delta.CDCNameBased
Usage: N/A.
spark-sql> DESC FUNCTION table_changes_by_path;
Function: table_changes_by_path
Class: org.apache.spark.sql.delta.CDCPathBased
Usage: N/A.
The functions accepts two arguments for the time- or version range:
startingVersion
orstartingTimestamp
endingVersion
orendingTimestamp
Learn More
Learn more about table-valued functions in The Internals of Spark SQL.
Internals¶
The CDF Table-Valued Functions are DeltaTableValueFunctions that are registered (injected) using DeltaSparkSessionExtension.
The CDF Table-Valued Functions are resolved (using DeltaAnalysis logical resolution rule) into LogicalRelation
s (Spark SQL) with the BaseRelations of the respective DeltaTableV2s with the following CDC Options:
startingVersion
orstartingTimestamp
endingVersion
orendingTimestamp
- readChangeFeed enabled
Demo¶
CREATE TABLE delta_demo (
ID LONG,
name STRING)
USING delta;
INSERT INTO delta_demo VALUES (0, "zero");
INSERT INTO delta_demo VALUES (1, "one");
DESC HISTORY delta_demo;
2 2023-03-26 19:14:47.895 NULL NULL WRITE {"mode":"Append","partitionBy":"[]"} NULL NULL NULL 1 Serializable true {"numFiles":"1","numOutputBytes":"705","numOutputRows":"1"} NULL Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
1 2023-03-26 19:14:38.828 NULL NULL WRITE {"mode":"Append","partitionBy":"[]"} NULL NULL NULL 0 Serializable true {"numFiles":"1","numOutputBytes":"712","numOutputRows":"1"} NULL Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
0 2023-03-26 19:14:06.261 NULL NULL CREATE TABLE {"description":null,"isManaged":"true","partitionBy":"[]","properties":"{}"} NULL NULL NULL NULL Serializabletrue {} NULL Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
SELECT * FROM table_changes("delta_demo", 0, 2);
Error in query: Error getting change data for range [0 , 2] as change data was not
recorded for version [0]. If you've enabled change data feed on this table,
use `DESCRIBE HISTORY` to see when it was first enabled.
Otherwise, to start recording change data, use `ALTER TABLE table_name SET TBLPROPERTIES
(delta.enableChangeDataFeed=true)`.
ALTER TABLE delta_demo SET TBLPROPERTIES
(delta.enableChangeDataFeed=true);
INSERT INTO delta_demo VALUES (2, "two");
INSERT INTO delta_demo VALUES (3, "three");
DESC HISTORY delta_demo;
5 2023-03-26 19:53:51.526 NULL NULL WRITE {"mode":"Append","partitionBy":"[]"} NULL NULL NULL 4 Serializable true {"numFiles":"1","numOutputBytes":"719","numOutputRows":"1"} NULL Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
4 2023-03-26 19:53:48.756 NULL NULL WRITE {"mode":"Append","partitionBy":"[]"} NULL NULL NULL 3 Serializable true {"numFiles":"1","numOutputBytes":"705","numOutputRows":"1"} NULL Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
3 2023-03-26 19:48:42.872 NULL NULL SET TBLPROPERTIES {"properties":"{"delta.enableChangeDataFeed":"true"}"} NULL NULL NULL 2 Serializable true {} NULL Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
2 2023-03-26 19:14:47.895 NULL NULL WRITE {"mode":"Append","partitionBy":"[]"} NULL NULL NULL 1 Serializable true {"numFiles":"1","numOutputBytes":"705","numOutputRows":"1"} NULL Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
1 2023-03-26 19:14:38.828 NULL NULL WRITE {"mode":"Append","partitionBy":"[]"} NULL NULL NULL 0 Serializable true {"numFiles":"1","numOutputBytes":"712","numOutputRows":"1"} NULL Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
0 2023-03-26 19:14:06.261 NULL NULL CREATE TABLE {"description":null,"isManaged":"true","partitionBy":"[]","properties":"{}"} NULL NULL NULL NULL Serializabletrue {} NULL Apache-Spark/3.3.2 Delta-Lake/2.3.0rc1
SELECT * FROM table_changes("delta_demo", 3, 5);
3 three insert 5 2023-03-26 19:53:51.526
2 two insert 4 2023-03-26 19:53:48.756