DeltaSourceCDCSupport¶
DeltaSourceCDCSupport is an abstraction to bring CDC support to DeltaSource.
DeltaSourceCDCSupport is used to create a streaming DataFrame of changes (between start and end versions) in streaming queries over delta tables.
Creating Streaming DataFrame of Changes¶
getCDCFileChangesAndCreateDataFrame(
startVersion: Long,
startIndex: Long,
isStartingVersion: Boolean,
endOffset: DeltaSourceOffset): DataFrame
getCDCFileChangesAndCreateDataFrame creates a streaming DataFrame of changes with the following:
- getFileChangesForCDC (with no
AdmissionLimits) for the versions and their FileActions isStreamingflag enabled
Metrics Discarded
Although CDCVersionDiffInfo returned from creating the streaming DataFrame of changes contains some metrics, they are discarded.
getCDCFileChangesAndCreateDataFrame is used when:
DeltaSourceBaseis requested for a streaming DataFrame between versions with readChangeFeed option enabled
getFileChangesForCDC¶
getFileChangesForCDC(
fromVersion: Long,
fromIndex: Long,
isStartingVersion: Boolean,
limits: Option[AdmissionLimits],
endOffset: Option[DeltaSourceOffset]): Iterator[(Long, Iterator[IndexedFile])]
With isStartingVersion on (true), getFileChangesForCDC gets the snapshot at the fromVersion version and turnsdataChange on for all AddFiles. getFileChangesForCDC creates a IndexedChangeFileSeq (with the snapshot and isInitialSnapshot flag enabled). getFileChangesForCDC...FIXME
With isStartingVersion off (false), getFileChangesForCDC filterAndIndexDeltaLogs for the fromVersion version.
That gives a collection of a version and IndexedChangeFileSeq pairs.
In the end, getFileChangesForCDC requests all the IndexedChangeFileSeqs to filterFiles (with fromVersion, fromIndex, limits and endOffset arguments).
getFileChangesForCDC is used when:
DeltaSourceBaseis requested to getFileChangesWithRateLimitDeltaSourceCDCSupportis requested to getCDCFileChangesAndCreateDataFrame
isStartingVersion¶
getFileChangesForCDC is given isStartingVersion flag when executed:
-
truefor the following:DeltaSourcewhen getStartingVersion is undefined (returnsNone)DeltaSourcewhen getBatch withstartOffsetOptionand getStartingVersion both undefined (Nones)
-
falsefor the following:DeltaSourcewhen getBatch withstartOffsetOptionundefined but getStartingVersion specified
-
trueorfalsefor the following:DeltaSourceBasewhen getNextOffsetFromPreviousOffset based on isStartingVersion (of the previous offset)DeltaSourcewhen getBatch withstartOffsetOptionspecified and based on the isStartingVersion (of the start offset)
filterAndIndexDeltaLogs¶
filterAndIndexDeltaLogs(
startVersion: Long): Iterator[(Long, IndexedChangeFileSeq)]
filterAndIndexDeltaLogs requests the DeltaLog to get the changes at the given startVersion version and on (Iterator[(Long, Seq[Action])]).
filterAndIndexDeltaLogs uses failOnDataLoss option to get the changes.
filterAndIndexDeltaLogs filterCDCActions (across the actions across all the versions) and converts the AddFiles, AddCDCFiles and RemoveFiles to IndexedFiles.
In the end, for every version, filterAndIndexDeltaLogs creates a IndexedChangeFileSeq with the IndexedFiles (and the isInitialSnapshot flag off).
filterCDCActions¶
filterCDCActions(
actions: Seq[Action],
version: Long): Seq[FileAction]
Note
version argument is ignored.
filterCDCActions collects the AddCDCFile actions from the given actions (if there are any).
Otherwise, filterCDCActions collects AddFiles and RemoveFiles with dataChange enabled.