Skip to content

CDCReader

CDCReader utility is the key class for CDF and CDC in DeltaLake (per this comment).

getCDCRelation

getCDCRelation(
  spark: SparkSession,
  deltaLog: DeltaLog,
  snapshotToUse: Snapshot,
  partitionFilters: Seq[Expression],
  conf: SQLConf,
  options: CaseInsensitiveStringMap): BaseRelation

Note

partitionFilters argument is not used.

getCDCRelation getVersionForCDC (with the startingVersion and startingTimestamp for the version and timestamp keys, respectively).

getCDCRelation...FIXME

getCDCRelation is used when:

Resolving Version

getVersionForCDC(
  spark: SparkSession,
  deltaLog: DeltaLog,
  conf: SQLConf,
  options: CaseInsensitiveStringMap,
  versionKey: String,
  timestampKey: String): Option[Long]

getVersionForCDC uses the given options map to get the value of the given versionKey key, if available.

When versionKey and timestampKey are specified

versionKey and timestampKey are specified in the given options argument that is passed down through getCDCRelation unmodified when DeltaLog is requested to create a relation with non-empty cdcOptions.

Otherwise, getVersionForCDC uses the given options map to get the value of the given timestampKey key, if available. getVersionForCDC...FIXME

If neither the given versionKey nor the timestampKey key is available in the options map, getVersionForCDC returns None (undefined value).

_change_data Directory

CDCReader defines _change_data as the name of the directory (under the data directory) where data changes of a delta table are written out (using DelayedCommitProtocol). This directory may contain partition directories.

Used when:

CDF Virtual Columns

CDC_COLUMNS_IN_DATA: Seq[String]

CDCReader defines a CDC_COLUMNS_IN_DATA collection with __is_cdc and _change_type CDF-specific column names.

__is_cdc Partition Column

CDCReader defines __is_cdc column name to partition on with Change Data Feed enabled.

__is_cdc column is added when TransactionalWrite is requested to performCDCPartition with CDF enabled on a delta table (and _change_type among the columns).

If added, __is_cdc column becomes the first partitioning column. It is then "consumed" by DelayedCommitProtocol (to write changes to cdc--prefixed files, not part-).

__is_cdc is one of the CDF Virtual Columns.

Used when:

_change_type Column

CDCReader defines _change_type column name for the column that represents a change type.

_change_type is one of the CDF Virtual Columns and among the columns in the CDF-aware read schema.

CDC_TYPE_COLUMN_NAME is used when:

CDC_TYPE_NOT_CDC Literal

CDC_TYPE_NOT_CDC: Literal

CDCReader creates CDC_TYPE_NOT_CDC value to be a Literal expression with null value (of StringType type).

CDC_TYPE_NOT_CDC is used as a special sentinel value for rows that are part of the main table rather than change data.

CDC_TYPE_NOT_CDC is used by DML commands when executed with Change Data Feed enabled:

All but DeleteCommand commands use CDC_TYPE_NOT_CDC with _change_type as follows:

Column(CDC_TYPE_NOT_CDC).as("_change_type")

DeleteCommand uses CDC_TYPE_NOT_CDC as follows:

.withColumn(
  "_change_type",
  Column(If(filterCondition, CDC_TYPE_NOT_CDC, Literal("delete")))
)

CDC_TYPE_NOT_CDC is used when (with Change Data Feed enabled):

CDC-Aware Table Scan (CDC Read)

isCDCRead(
  options: CaseInsensitiveStringMap): Boolean

isCDCRead is true when one of the following options is specified (in the given options):

  1. readChangeFeed with true value
  2. (legacy) readChangeData with true value

Otherwise, isCDCRead is false.

isCDCRead is used when:

CDF-Aware Read Schema (Adding CDF Columns)

cdcReadSchema(
  deltaSchema: StructType): StructType

cdcReadSchema adds the CDF columns to the given deltaSchema.

Name Type
_change_type StringType
_commit_version LongType
_commit_timestamp TimestampType

cdcReadSchema is used when:

changesToDF

changesToDF(
  deltaLog: DeltaLog,
  start: Long,
  end: Long,
  changes: Iterator[(Long, Seq[Action])],
  spark: SparkSession,
  isStreaming: Boolean = false): CDCVersionDiffInfo

changesToDF...FIXME

changesToDF is used when:

DeltaUnsupportedOperationException

changesToDF makes sure that the DeltaColumnMappingMode is NoMapping or throws a DeltaUnsupportedOperationException:

Change data feed (CDF) reads are currently not supported on tables with column mapping enabled.

scanIndex

scanIndex(
  spark: SparkSession,
  index: TahoeFileIndex,
  metadata: Metadata,
  isStreaming: Boolean = false): DataFrame

scanIndex creates a LogicalRelation (Spark SQL) with a HadoopFsRelation (Spark SQL) (with the given TahoeFileIndex, cdcReadSchema, no bucketing, DeltaParquetFileFormat).

In the end, scanIndex wraps it up as a DataFrame.

isCDCEnabledOnTable

isCDCEnabledOnTable(
  metadata: Metadata): Boolean

isCDCEnabledOnTable is the value of the delta.enableChangeDataFeed table property.

isCDCEnabledOnTable is used when:

insert Change Type

CDCReader defines insert value for the value of the _change_type column in the following: