DeltaSourceCDCSupport¶
DeltaSourceCDCSupport
is an abstraction to bring CDC support to DeltaSource.
DeltaSourceCDCSupport
is used to create a streaming DataFrame of changes (between start and end versions) in streaming queries over delta tables.
Creating Streaming DataFrame of Changes¶
getCDCFileChangesAndCreateDataFrame(
startVersion: Long,
startIndex: Long,
isStartingVersion: Boolean,
endOffset: DeltaSourceOffset): DataFrame
getCDCFileChangesAndCreateDataFrame
creates a streaming DataFrame of changes with the following:
- getFileChangesForCDC (with no
AdmissionLimits
) for the versions and their FileActions isStreaming
flag enabled
Metrics Discarded
Although CDCVersionDiffInfo returned from creating the streaming DataFrame of changes contains some metrics, they are discarded.
getCDCFileChangesAndCreateDataFrame
is used when:
DeltaSourceBase
is requested for a streaming DataFrame between versions with readChangeFeed option enabled
getFileChangesForCDC¶
getFileChangesForCDC(
fromVersion: Long,
fromIndex: Long,
isStartingVersion: Boolean,
limits: Option[AdmissionLimits],
endOffset: Option[DeltaSourceOffset]): Iterator[(Long, Iterator[IndexedFile])]
With isStartingVersion on (true
), getFileChangesForCDC
gets the snapshot at the fromVersion
version and turnsdataChange
on for all AddFiles. getFileChangesForCDC
creates a IndexedChangeFileSeq (with the snapshot and isInitialSnapshot
flag enabled). getFileChangesForCDC
...FIXME
With isStartingVersion off (false
), getFileChangesForCDC
filterAndIndexDeltaLogs for the fromVersion
version.
That gives a collection of a version and IndexedChangeFileSeq pairs.
In the end, getFileChangesForCDC
requests all the IndexedChangeFileSeq
s to filterFiles (with fromVersion
, fromIndex
, limits
and endOffset
arguments).
getFileChangesForCDC
is used when:
DeltaSourceBase
is requested to getFileChangesWithRateLimitDeltaSourceCDCSupport
is requested to getCDCFileChangesAndCreateDataFrame
isStartingVersion¶
getFileChangesForCDC
is given isStartingVersion
flag when executed:
-
true
for the following:DeltaSource
when getStartingVersion is undefined (returnsNone
)DeltaSource
when getBatch withstartOffsetOption
and getStartingVersion both undefined (None
s)
-
false
for the following:DeltaSource
when getBatch withstartOffsetOption
undefined but getStartingVersion specified
-
true
orfalse
for the following:DeltaSourceBase
when getNextOffsetFromPreviousOffset based on isStartingVersion (of the previous offset)DeltaSource
when getBatch withstartOffsetOption
specified and based on the isStartingVersion (of the start offset)
filterAndIndexDeltaLogs¶
filterAndIndexDeltaLogs(
startVersion: Long): Iterator[(Long, IndexedChangeFileSeq)]
filterAndIndexDeltaLogs
requests the DeltaLog to get the changes at the given startVersion
version and on (Iterator[(Long, Seq[Action])]
).
filterAndIndexDeltaLogs
uses failOnDataLoss option to get the changes.
filterAndIndexDeltaLogs
filterCDCActions (across the actions across all the versions) and converts the AddFiles, AddCDCFiles and RemoveFiles to IndexedFile
s.
In the end, for every version, filterAndIndexDeltaLogs
creates a IndexedChangeFileSeq with the IndexedFile
s (and the isInitialSnapshot flag off).
filterCDCActions¶
filterCDCActions(
actions: Seq[Action],
version: Long): Seq[FileAction]
Note
version
argument is ignored.
filterCDCActions
collects the AddCDCFile actions from the given actions
(if there are any).
Otherwise, filterCDCActions
collects AddFiles and RemoveFiles with dataChange
enabled.