DeltaSourceBase¶
DeltaSourceBase is an extension of the Source (Spark Structured Streaming) abstraction for DeltaSource.
Schema¶
schema removes the internal table metadata (with the readSchemaAtSourceInit).
With readChangeFeed option enabled, schema adds the CDF columns to the schema.
In the end, schema returns the schema of a delta table with or without CDF columns (based on readChangeFeed option).
Creating Streaming DataFrame Between Offsets¶
createDataFrameBetweenOffsets(
startVersion: Long,
startIndex: Long,
isStartingVersion: Boolean,
startSourceVersion: Option[Long],
startOffsetOption: Option[Offset],
endOffset: DeltaSourceOffset): DataFrame
createDataFrameBetweenOffsets creates a streaming DataFrame between versions (possibly CDF-aware).
Obsolete Soon?
createDataFrameBetweenOffsets is simply an alias of getFileChangesAndCreateDataFrame.
Moreover, the startSourceVersion and startOffsetOption input arguments are ignored.
It looks like this method should be marked as @obsolete and soon removed.
createDataFrameBetweenOffsets is used when:
DeltaSourceis requested for a streaming micro-batch DataFrame
Creating Streaming DataFrame Between Versions (Possibly CDF-Aware)¶
getFileChangesAndCreateDataFrame(
startVersion: Long,
startIndex: Long,
isStartingVersion: Boolean,
endOffset: DeltaSourceOffset): DataFrame
With readChangeFeed option enabled, getFileChangesAndCreateDataFrame getCDCFileChangesAndCreateDataFrame.
Otherwise, getFileChangesAndCreateDataFrame gets the file changes (as IndexedFiles with AddFiles, RemoveFiles or AddCDCFiles) and take as much file changes so their version and index (these actions belong to) are up to and including DeltaSourceOffset (based on the reservoirVersion and index). getFileChangesAndCreateDataFrame filters out the file changes with the path that matches the excludeRegex option. In the end, getFileChangesAndCreateDataFrame createDataFrame (from the filtered file changes).
createDataFrame¶
createDataFrame(
indexedFiles: Iterator[IndexedFile]): DataFrame
createDataFrame collects AddFiles from the given indexedFiles collection.
In the end, createDataFrame requests the DeltaLog to createDataFrame (for the AddFiles and with isStreaming flag enabled).
getStartingOffsetFromSpecificDeltaVersion¶
getStartingOffsetFromSpecificDeltaVersion(
fromVersion: Long,
isStartingVersion: Boolean,
limits: Option[AdmissionLimits]): Option[Offset]
getStartingOffsetFromSpecificDeltaVersion getFileChangesWithRateLimit and takes the last IndexedFile (if any).
getStartingOffsetFromSpecificDeltaVersion returns None for no (last) IndexedFile. Otherwise, getStartingOffsetFromSpecificDeltaVersion buildOffsetFromIndexedFile.
getStartingOffsetFromSpecificDeltaVersion is used when:
DeltaSourceis requested for the starting offset
getNextOffsetFromPreviousOffset¶
getNextOffsetFromPreviousOffset(
previousOffset: DeltaSourceOffset,
limits: Option[AdmissionLimits]): Option[Offset]
getNextOffsetFromPreviousOffset...FIXME
getNextOffsetFromPreviousOffset is used when:
DeltaSourceis requested for the latest offset
getFileChangesWithRateLimit¶
getFileChangesWithRateLimit(
fromVersion: Long,
fromIndex: Long,
isStartingVersion: Boolean,
limits: Option[AdmissionLimits] = Some(new AdmissionLimits())): ClosableIterator[IndexedFile]
getFileChangesWithRateLimit...FIXME
getFileChangesWithRateLimit is used when:
DeltaSourceBaseis requested to getStartingOffsetFromSpecificDeltaVersion and getNextOffsetFromPreviousOffset
buildOffsetFromIndexedFile¶
buildOffsetFromIndexedFile(
indexedFile: IndexedFile,
version: Long,
isStartingVersion: Boolean): Option[DeltaSourceOffset]
buildOffsetFromIndexedFile...FIXME
buildOffsetFromIndexedFile is used when:
DeltaSourceBaseis requested to getStartingOffsetFromSpecificDeltaVersion and getNextOffsetFromPreviousOffset
SupportsAdmissionControl¶
DeltaSourceBase is a SupportsAdmissionControl (Spark Structured Streaming).
Note
All the methods of SupportsAdmissionControl are in DeltaSource.
allowUnsafeStreamingReadOnColumnMappingSchemaChanges¶
allowUnsafeStreamingReadOnColumnMappingSchemaChanges: Boolean
allowUnsafeStreamingReadOnColumnMappingSchemaChanges is the value of spark.databricks.delta.streaming.unsafeReadOnIncompatibleColumnMappingSchemaChanges.enabled configuration property.
allowUnsafeStreamingReadOnColumnMappingSchemaChanges is used when:
DeltaSourceBaseis requested to checkReadIncompatibleSchemaChangeOnStreamStartOnce, checkReadIncompatibleSchemaChangesDeltaSourceMetadataEvolutionSupportis requested toreadyToInitializeMetadataTrackingEagerlyandtrackingMetadataChange
isStreamingFromColumnMappingTable¶
isStreamingFromColumnMappingTable: Boolean
DeltaSourceBase initializes isStreamingFromColumnMappingTable internal flag when created.
isStreamingFromColumnMappingTable is enabled (true) when streaming from a delta table with Column Mapping.
In other words, isStreamingFromColumnMappingTable is enabled when the DeltaColumnMappingMode of the Metadata of this SnapshotDescriptor is any value but NoMapping.
isStreamingFromColumnMappingTable is used when:
DeltaSourceBaseis requested to checkReadIncompatibleSchemaChangeOnStreamStartOnce, checkReadIncompatibleSchemaChanges
persistedMetadataAtSourceInit¶
persistedMetadataAtSourceInit: Option[PersistedMetadata]
DeltaSourceBase initializes persistedMetadataAtSourceInit internal flag when created.
persistedMetadataAtSourceInit is the PersistedMetadata of this DeltaSourceMetadataTrackingLog, if all defined.
In other words, persistedMetadataAtSourceInit is defined only when this DeltaSourceMetadataTrackingLog was (at the very minimum).
persistedMetadataAtSourceInit is used when:
DeltaSourceBaseis requested for a SnapshotDescriptorDeltaSourceMetadataEvolutionSupportis requested to hasMetadataOrProtocolChangeComparedToStreamMetadata