Skip to content

DeltaSourceCDCSupport

DeltaSourceCDCSupport is an abstraction to bring CDC support to DeltaSource.

DeltaSourceCDCSupport is used to create a streaming DataFrame of changes (between start and end versions) in streaming queries over delta tables.

Creating Streaming DataFrame of Changes

getCDCFileChangesAndCreateDataFrame(
  startVersion: Long,
  startIndex: Long,
  isStartingVersion: Boolean,
  endOffset: DeltaSourceOffset): DataFrame

getCDCFileChangesAndCreateDataFrame creates a streaming DataFrame of changes with the following:

Metrics Discarded

Although CDCVersionDiffInfo returned from creating the streaming DataFrame of changes contains some metrics, they are discarded.


getCDCFileChangesAndCreateDataFrame is used when:

getFileChangesForCDC

getFileChangesForCDC(
  fromVersion: Long,
  fromIndex: Long,
  isStartingVersion: Boolean,
  limits: Option[AdmissionLimits],
  endOffset: Option[DeltaSourceOffset]): Iterator[(Long, Iterator[IndexedFile])]

With isStartingVersion on (true), getFileChangesForCDC gets the snapshot at the fromVersion version and turnsdataChange on for all AddFiles. getFileChangesForCDC creates a IndexedChangeFileSeq (with the snapshot and isInitialSnapshot flag enabled). getFileChangesForCDC...FIXME

With isStartingVersion off (false), getFileChangesForCDC filterAndIndexDeltaLogs for the fromVersion version.

That gives a collection of a version and IndexedChangeFileSeq pairs.

In the end, getFileChangesForCDC requests all the IndexedChangeFileSeqs to filterFiles (with fromVersion, fromIndex, limits and endOffset arguments).


getFileChangesForCDC is used when:

isStartingVersion

getFileChangesForCDC is given isStartingVersion flag when executed:

filterAndIndexDeltaLogs

filterAndIndexDeltaLogs(
  startVersion: Long): Iterator[(Long, IndexedChangeFileSeq)]

filterAndIndexDeltaLogs requests the DeltaLog to get the changes at the given startVersion version and on (Iterator[(Long, Seq[Action])]).

filterAndIndexDeltaLogs uses failOnDataLoss option to get the changes.

filterAndIndexDeltaLogs filterCDCActions (across the actions across all the versions) and converts the AddFiles, AddCDCFiles and RemoveFiles to IndexedFiles.

In the end, for every version, filterAndIndexDeltaLogs creates a IndexedChangeFileSeq with the IndexedFiles (and the isInitialSnapshot flag off).

filterCDCActions

filterCDCActions(
  actions: Seq[Action],
  version: Long): Seq[FileAction]

Note

version argument is ignored.

filterCDCActions collects the AddCDCFile actions from the given actions (if there are any).

Otherwise, filterCDCActions collects AddFiles and RemoveFiles with dataChange enabled.