CDCReaderImpl is an marker abstraction of Change Data Feed-aware Readers.

Fun Fact

Despite the suffix (Impl), CDCReaderImpl is a trait not an implementation (class).


Creating CDF-Aware Relation

  spark: SparkSession,
  snapshotToUse: Snapshot,
  isTimeTravelQuery: Boolean,
  conf: SQLConf,
  options: CaseInsensitiveStringMap): BaseRelation

getCDCRelation getVersionForCDC with the following:

getCDCRelation getBatchSchemaModeForTable.

getCDCRelation getVersionForCDC with the following:

getCDCRelation prints out the following INFO message to the logs:

startingVersion: [startingVersion], endingVersion: [endingVersionOpt]

In the end, getCDCRelation creates a DeltaCDFRelation.

getCDCRelation is used when:

Resolving Version

  spark: SparkSession,
  deltaLog: DeltaLog,
  conf: SQLConf,
  options: CaseInsensitiveStringMap,
  versionKey: String,
  timestampKey: String): Option[ResolvedCDFVersion]

FIXME Review Me

getVersionForCDC uses the given options map to get the value of the given versionKey key, if available.

Otherwise, getVersionForCDC uses the given options map to get the value of the given timestampKey key, if available. getVersionForCDC...FIXME

If neither the given versionKey nor the timestampKey key is available in the options map, getVersionForCDC returns None (undefined value).

Creating Batch DataFrame of Changes

  deltaLog: DeltaLog,
  start: Long,
  end: Long,
  spark: SparkSession,
  readSchemaSnapshot: Option[Snapshot] = None,
  useCoarseGrainedCDC: Boolean = false): DataFrame

changesToBatchDF requests the given DeltaLog for the changes from the given start version (inclusive) until the given end version.

In the end, changesToBatchDF creates a DataFrame of changes (with the changes per version and isStreaming flag disabled).

changesToBatchDF is used when:

Creating DataFrame of Changes

  readSchemaSnapshot: SnapshotDescriptor,
  start: Long,
  end: Long,
  changes: Iterator[(Long, Seq[Action])],
  spark: SparkSession,
  isStreaming: Boolean = false,
  useCoarseGrainedCDC: Boolean = false): CDCVersionDiffInfo
isStreaming Input Argument
Value Caller
true DeltaSourceCDCSupport is requested to getCDCFileChangesAndCreateDataFrame
useCoarseGrainedCDC Input Argument

useCoarseGrainedCDC is disabled (false) by default and for all the other known use cases.

changesToDF getTimestampsByVersion.

changesToDF requests the DeltaLog (of the given SnapshotDescriptor) for the Snapshot at the given start.

changesToDF asserts that one of the following is enabled (or throws a DeltaAnalysisException):

changesToDF reads configuration property (that can potentially block batch reads, when the given isStreaming flag is disabled).


In the end, changesToDF creates a new CDCVersionDiffInfo (with the DataFrame of the changes).

changesToDF is used when:


  addFileSpecs: Seq[CDCDataSpec[AddFile]],
  removeFileSpecs: Seq[CDCDataSpec[RemoveFile]],
  deltaLog: DeltaLog,
  snapshot: SnapshotDescriptor,
  isStreaming: Boolean,
  spark: SparkSession): Seq[DataFrame]



buildCDCDataSpecSeq[T <: FileAction](
  actionsByVersion: MutableMap[TableVersion, ListBuffer[T]],
  versionToCommitInfo: MutableMap[Long, CommitInfo]): Seq[CDCDataSpec[T]]

buildCDCDataSpecSeq converts the given actionsByVersion into CDCDataSpecs (with CommitInfos from the given versionToCommitInfo mapping).


  addFilesMap: Map[FilePathWithTableVersion, AddFile],
  removeFilesMap: Map[FilePathWithTableVersion, RemoveFile],
  versionToCommitInfo: Map[Long, CommitInfo],
  deltaLog: DeltaLog,
  snapshot: SnapshotDescriptor,
  isStreaming: Boolean,
  spark: SparkSession): Seq[DataFrame]



  add: AddFile,
  remove: RemoveFile,
  dvStore: DeletionVectorStore,
  deltaLog: DeltaLog): Seq[FileAction]


Creating DataFrame over Delta-Aware FileIndex

  spark: SparkSession,
  index: TahoeFileIndexWithSnapshotDescriptor,
  isStreaming: Boolean = false): DataFrame


In order to understand the scanIndex, it is firstly worth to understand the role of HadoopFsRelation.

scanIndex creates a HadoopFsRelation (Spark SQL) based on the given TahoeFileIndexWithSnapshotDescriptor as follows:

Property Value
location The given TahoeFileIndexWithSnapshotDescriptor
partitionSchema The partitionSchema of the given TahoeFileIndexWithSnapshotDescriptor
dataSchema The CDF-aware read schema based on the schema of the given TahoeFileIndexWithSnapshotDescriptor
bucketSpec Undefined (None)
fileFormat A new DeltaParquetFileFormat
options The options of the DeltaLog of the given TahoeFileIndexWithSnapshotDescriptor

scanIndex creates a LogicalRelation (Spark SQL) for the HadoopFsRelation (and the given isStreaming flag).

In the end, scanIndex creates a DataFrame for the LogicalRelation.

scanIndex is used when:

CDF-Aware Read Schema (Adding CDF Columns)

  deltaSchema: StructType): StructType

cdcReadSchema makes the given schema (StructType) of a delta table CDF-aware by appending the following CDF metadata fields:

Column Name Data Type
_change_type StringType
_commit_version LongType
_commit_timestamp TimestampType

cdcReadSchema is used when:

CDC-Aware Table Scan (CDC Read)

  options: CaseInsensitiveStringMap): Boolean

isCDCRead is true when one of the following options is specified (in the given options) with true value (case-insensitive):

  1. readChangeFeed
  2. (legacy) readChangeData

Otherwise, isCDCRead is false.

isCDCRead is used when:


  metadata: Metadata,
  spark: SparkSession): Boolean

isCDCEnabledOnTable checks if the given metadata requires the Change Data Feed feature to be enabled (based on delta.enableChangeDataFeed table property).

isCDCEnabledOnTable is used when:


CDCReaderImpl is an abstract class and logging is configured using the logger of the implementations.