Skip to content

DeltaSourceBase

DeltaSourceBase is an extension of the Source (Spark Structured Streaming) abstraction for DeltaSource.

Read Schema

schema: StructType

schema is part of the Source (Spark Structured Streaming) abstraction.


schema removes the default expressions from the table schema (from the Metadata of the Snapshot of the DeltaLog).

In the end, schema adds the CDF columns to the schema when readChangeFeed option is enabled. Otherwise, schema returns the schema with no CDF columns and default expressions.

createDataFrameBetweenOffsets

createDataFrameBetweenOffsets(
  startVersion: Long,
  startIndex: Long,
  isStartingVersion: Boolean,
  startSourceVersion: Option[Long],
  startOffsetOption: Option[Offset],
  endOffset: DeltaSourceOffset): DataFrame

createDataFrameBetweenOffsets getFileChangesAndCreateDataFrame.

Note

The startSourceVersion and startOffsetOption input arguments are ignored. It looks like the method should be marked as @obsolete and soon removed.

createDataFrameBetweenOffsets is used when:

getFileChangesAndCreateDataFrame

getFileChangesAndCreateDataFrame(
  startVersion: Long,
  startIndex: Long,
  isStartingVersion: Boolean,
  endOffset: DeltaSourceOffset): DataFrame

With readChangeFeed option enabled, getFileChangesAndCreateDataFrame getCDCFileChangesAndCreateDataFrame.

Otherwise, getFileChangesAndCreateDataFrame gets the file changes (as IndexedFiles with AddFiles, RemoveFiles or AddCDCFiles) and take as much file changes so their version and index (these actions belong to) are up to and including DeltaSourceOffset (based on the reservoirVersion and index). getFileChangesAndCreateDataFrame filters out the file changes with the path that matches the excludeRegex option. In the end, getFileChangesAndCreateDataFrame createDataFrame (from the filtered file changes).

createDataFrame

createDataFrame(
  indexedFiles: Iterator[IndexedFile]): DataFrame

createDataFrame collects AddFiles from the given indexedFiles collection.

In the end, createDataFrame requests the DeltaLog to createDataFrame (for the AddFiles and with isStreaming flag enabled).

getStartingOffsetFromSpecificDeltaVersion

getStartingOffsetFromSpecificDeltaVersion(
  fromVersion: Long,
  isStartingVersion: Boolean,
  limits: Option[AdmissionLimits]): Option[Offset]

getStartingOffsetFromSpecificDeltaVersion getFileChangesWithRateLimit and takes the last IndexedFile (if any).

getStartingOffsetFromSpecificDeltaVersion returns None for no (last) IndexedFile. Otherwise, getStartingOffsetFromSpecificDeltaVersion buildOffsetFromIndexedFile.

getStartingOffsetFromSpecificDeltaVersion is used when:

getNextOffsetFromPreviousOffset

getNextOffsetFromPreviousOffset(
  previousOffset: DeltaSourceOffset,
  limits: Option[AdmissionLimits]): Option[Offset]

getNextOffsetFromPreviousOffset...FIXME

getNextOffsetFromPreviousOffset is used when:

getFileChangesWithRateLimit

getFileChangesWithRateLimit(
  fromVersion: Long,
  fromIndex: Long,
  isStartingVersion: Boolean,
  limits: Option[AdmissionLimits] = Some(new AdmissionLimits())): ClosableIterator[IndexedFile]

getFileChangesWithRateLimit...FIXME

getFileChangesWithRateLimit is used when:

buildOffsetFromIndexedFile

buildOffsetFromIndexedFile(
  indexedFile: IndexedFile,
  version: Long,
  isStartingVersion: Boolean): Option[DeltaSourceOffset]

buildOffsetFromIndexedFile...FIXME

buildOffsetFromIndexedFile is used when:

SupportsAdmissionControl

DeltaSourceBase is a SupportsAdmissionControl (Spark Structured Streaming).

Note

All the methods of SupportsAdmissionControl are in DeltaSource.