Change Data Feed¶
Change Data Feed (CDF) (fka Change Data Capture or CDC in short) is a table feature in Delta Lake that allows tracking row-level changes between versions of a delta table.
Change Data Feed can be enabled on a delta table using delta.enableChangeDataFeed table property.
Change Data Feed can be enabled globally (on all new delta tables) using spark.databricks.delta.properties.defaults.enableChangeDataFeed system-wide configuration property.
CDF data changes are written out (by DelayedCommitProtocol) to _change_data directory as cdc-
-prefixed parquet-encoded change data files.
With CDF-Aware Table Scan (CDF Read) (based on readChangeFeed read option), loading a delta table gives data changes (not the data of a particular version of the delta table).
CDCReader is used to build a DataFrame of the row-level changes for all the possible structured query types (described using DataFrame
API):
- Batch queries (Spark SQL)
- Streaming queries (Spark Structured Streaming)
Change Data Feed was released in Delta Lake 2.0.0 (that was tracked under Support for Change Data Feed in Delta Lake).
delta.enableChangeDataFeed¶
Change Data Feed can be enabled on a delta table using delta.enableChangeDataFeed table property (through ChangeDataFeedTableFeature).
ALTER TABLE delta_demo
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
CREATE TABLE delta_demo (id INT, name STRING, age INT)
USING delta
TBLPROPERTIES (delta.enableChangeDataFeed = true)
Additionally, this property can be set for all new tables by default.
SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Options¶
Change Data Feed is enabled in batch and streaming queries using readChangeFeed option.
spark
.read
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", startingVersion)
.option("endingVersion", endingVersion)
.table("source")
spark
.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", startingVersion)
.table("source")
readChangeFeed
is used alongside the other CDF options:
CDF-Aware Read Schema¶
Change Data Feed uses the following metadata columns for CDF-aware scans (beside the data schema):
Column Name | Data Type | Description |
---|---|---|
_change_type | StringType | The type of a data change |
_commit_version | LongType | |
_commit_timestamp | TimestampType |
Change Data Directory¶
_change_data is the name of the directory (under the top-level data directory) for change data files.
This directory may contain partition directories (i.e. _change_data/part1=value1/...
) with changes to data with partition values.
_change_data
is a hidden directory and must not be considered in delta-related file operations (e.g., VACUUM and FSCK
).
Change Type Column¶
Change data files contain the additional _change_type column that identifies the type of change event (beside the data columns).
_change_type | Command | Description |
---|---|---|
delete | DELETE | The value has been deleted |
insert | ||
update_postimage | UPDATE | The value after UPDATE |
update_preimage | UPDATE | The value before UPDATE |
When writing out changes to a delta table, _change_type
column is used to partition rows with change events and write them out to _change_data directory (as AddCDCFiles).
Column Mapping Not Supported¶
Change data feed reads are currently not supported on tables with column mapping enabled (and a DeltaUnsupportedOperationException is thrown).
CDF Table-Valued Functions¶
CDF Table-Valued Functions are provided to read the table changes of delta tables.
File Indices¶
FIXME What's the purpose of these indices?!
Change Data Feed in Streaming and Batch Queries¶
DataFrame
API-based Spark modules (Spark SQL and Spark Structured Streaming)...
FIXME
Describe the following (move parts from the intro):
- The entry points (abstractions) of each query type (batch and streaming) for loading CDF changes (table scans)
- Writing data out