Skip to content

DeltaSourceBase

DeltaSourceBase is an extension of the Source (Spark Structured Streaming) abstraction for DeltaSource.

Schema

Source
schema: StructType

schema is part of the Source (Spark Structured Streaming) abstraction.

schema removes the internal table metadata (with the readSchemaAtSourceInit).

With readChangeFeed option enabled, schema adds the CDF columns to the schema.

In the end, schema returns the schema of a delta table with or without CDF columns (based on readChangeFeed option).

Creating Streaming DataFrame Between Offsets

createDataFrameBetweenOffsets(
  startVersion: Long,
  startIndex: Long,
  isStartingVersion: Boolean,
  startSourceVersion: Option[Long],
  startOffsetOption: Option[Offset],
  endOffset: DeltaSourceOffset): DataFrame

createDataFrameBetweenOffsets creates a streaming DataFrame between versions (possibly CDF-aware).

Obsolete Soon?

createDataFrameBetweenOffsets is simply an alias of getFileChangesAndCreateDataFrame.

Moreover, the startSourceVersion and startOffsetOption input arguments are ignored.

It looks like this method should be marked as @obsolete and soon removed.


createDataFrameBetweenOffsets is used when:

Creating Streaming DataFrame Between Versions (Possibly CDF-Aware)

getFileChangesAndCreateDataFrame(
  startVersion: Long,
  startIndex: Long,
  isStartingVersion: Boolean,
  endOffset: DeltaSourceOffset): DataFrame

With readChangeFeed option enabled, getFileChangesAndCreateDataFrame getCDCFileChangesAndCreateDataFrame.

Otherwise, getFileChangesAndCreateDataFrame gets the file changes (as IndexedFiles with AddFiles, RemoveFiles or AddCDCFiles) and take as much file changes so their version and index (these actions belong to) are up to and including DeltaSourceOffset (based on the reservoirVersion and index). getFileChangesAndCreateDataFrame filters out the file changes with the path that matches the excludeRegex option. In the end, getFileChangesAndCreateDataFrame createDataFrame (from the filtered file changes).

createDataFrame

createDataFrame(
  indexedFiles: Iterator[IndexedFile]): DataFrame

createDataFrame collects AddFiles from the given indexedFiles collection.

In the end, createDataFrame requests the DeltaLog to createDataFrame (for the AddFiles and with isStreaming flag enabled).

getStartingOffsetFromSpecificDeltaVersion

getStartingOffsetFromSpecificDeltaVersion(
  fromVersion: Long,
  isStartingVersion: Boolean,
  limits: Option[AdmissionLimits]): Option[Offset]

getStartingOffsetFromSpecificDeltaVersion getFileChangesWithRateLimit and takes the last IndexedFile (if any).

getStartingOffsetFromSpecificDeltaVersion returns None for no (last) IndexedFile. Otherwise, getStartingOffsetFromSpecificDeltaVersion buildOffsetFromIndexedFile.


getStartingOffsetFromSpecificDeltaVersion is used when:

getNextOffsetFromPreviousOffset

getNextOffsetFromPreviousOffset(
  previousOffset: DeltaSourceOffset,
  limits: Option[AdmissionLimits]): Option[Offset]

getNextOffsetFromPreviousOffset...FIXME


getNextOffsetFromPreviousOffset is used when:

getFileChangesWithRateLimit

getFileChangesWithRateLimit(
  fromVersion: Long,
  fromIndex: Long,
  isStartingVersion: Boolean,
  limits: Option[AdmissionLimits] = Some(new AdmissionLimits())): ClosableIterator[IndexedFile]

getFileChangesWithRateLimit...FIXME


getFileChangesWithRateLimit is used when:

buildOffsetFromIndexedFile

buildOffsetFromIndexedFile(
  indexedFile: IndexedFile,
  version: Long,
  isStartingVersion: Boolean): Option[DeltaSourceOffset]

buildOffsetFromIndexedFile...FIXME


buildOffsetFromIndexedFile is used when:

SupportsAdmissionControl

DeltaSourceBase is a SupportsAdmissionControl (Spark Structured Streaming).

Note

All the methods of SupportsAdmissionControl are in DeltaSource.

allowUnsafeStreamingReadOnColumnMappingSchemaChanges

allowUnsafeStreamingReadOnColumnMappingSchemaChanges: Boolean

allowUnsafeStreamingReadOnColumnMappingSchemaChanges is the value of spark.databricks.delta.streaming.unsafeReadOnIncompatibleColumnMappingSchemaChanges.enabled configuration property.


allowUnsafeStreamingReadOnColumnMappingSchemaChanges is used when:

isStreamingFromColumnMappingTable

isStreamingFromColumnMappingTable: Boolean

DeltaSourceBase initializes isStreamingFromColumnMappingTable internal flag when created.

isStreamingFromColumnMappingTable is enabled (true) when streaming from a delta table with Column Mapping.

In other words, isStreamingFromColumnMappingTable is enabled when the DeltaColumnMappingMode of the Metadata of this SnapshotDescriptor is any value but NoMapping.


isStreamingFromColumnMappingTable is used when:

persistedMetadataAtSourceInit

persistedMetadataAtSourceInit: Option[PersistedMetadata]

DeltaSourceBase initializes persistedMetadataAtSourceInit internal flag when created.

persistedMetadataAtSourceInit is the PersistedMetadata of this DeltaSourceMetadataTrackingLog, if all defined.

In other words, persistedMetadataAtSourceInit is defined only when this DeltaSourceMetadataTrackingLog was (at the very minimum).


persistedMetadataAtSourceInit is used when: