Skip to content

DeltaSourceBase

DeltaSourceBase is an extension of the Source (Spark Structured Streaming) abstraction for DeltaSource.

Schema

Source
schema: StructType

schema is part of the Source (Spark Structured Streaming) abstraction.

schema removes the internal table metadata (with the readSchemaAtSourceInit).

With readChangeFeed option enabled, schema adds the CDF columns to the schema.

In the end, schema returns the schema of a delta table with or without CDF columns (based on readChangeFeed option).

createDataFrameBetweenOffsets

createDataFrameBetweenOffsets(
  startVersion: Long,
  startIndex: Long,
  isStartingVersion: Boolean,
  startSourceVersion: Option[Long],
  startOffsetOption: Option[Offset],
  endOffset: DeltaSourceOffset): DataFrame

createDataFrameBetweenOffsets getFileChangesAndCreateDataFrame.

Note

The startSourceVersion and startOffsetOption input arguments are ignored. It looks like the method should be marked as @obsolete and soon removed.

createDataFrameBetweenOffsets is used when:

getFileChangesAndCreateDataFrame

getFileChangesAndCreateDataFrame(
  startVersion: Long,
  startIndex: Long,
  isStartingVersion: Boolean,
  endOffset: DeltaSourceOffset): DataFrame

With readChangeFeed option enabled, getFileChangesAndCreateDataFrame getCDCFileChangesAndCreateDataFrame.

Otherwise, getFileChangesAndCreateDataFrame gets the file changes (as IndexedFiles with AddFiles, RemoveFiles or AddCDCFiles) and take as much file changes so their version and index (these actions belong to) are up to and including DeltaSourceOffset (based on the reservoirVersion and index). getFileChangesAndCreateDataFrame filters out the file changes with the path that matches the excludeRegex option. In the end, getFileChangesAndCreateDataFrame createDataFrame (from the filtered file changes).

createDataFrame

createDataFrame(
  indexedFiles: Iterator[IndexedFile]): DataFrame

createDataFrame collects AddFiles from the given indexedFiles collection.

In the end, createDataFrame requests the DeltaLog to createDataFrame (for the AddFiles and with isStreaming flag enabled).

getStartingOffsetFromSpecificDeltaVersion

getStartingOffsetFromSpecificDeltaVersion(
  fromVersion: Long,
  isStartingVersion: Boolean,
  limits: Option[AdmissionLimits]): Option[Offset]

getStartingOffsetFromSpecificDeltaVersion getFileChangesWithRateLimit and takes the last IndexedFile (if any).

getStartingOffsetFromSpecificDeltaVersion returns None for no (last) IndexedFile. Otherwise, getStartingOffsetFromSpecificDeltaVersion buildOffsetFromIndexedFile.

getStartingOffsetFromSpecificDeltaVersion is used when:

getNextOffsetFromPreviousOffset

getNextOffsetFromPreviousOffset(
  previousOffset: DeltaSourceOffset,
  limits: Option[AdmissionLimits]): Option[Offset]

getNextOffsetFromPreviousOffset...FIXME

getNextOffsetFromPreviousOffset is used when:

getFileChangesWithRateLimit

getFileChangesWithRateLimit(
  fromVersion: Long,
  fromIndex: Long,
  isStartingVersion: Boolean,
  limits: Option[AdmissionLimits] = Some(new AdmissionLimits())): ClosableIterator[IndexedFile]

getFileChangesWithRateLimit...FIXME

getFileChangesWithRateLimit is used when:

buildOffsetFromIndexedFile

buildOffsetFromIndexedFile(
  indexedFile: IndexedFile,
  version: Long,
  isStartingVersion: Boolean): Option[DeltaSourceOffset]

buildOffsetFromIndexedFile...FIXME

buildOffsetFromIndexedFile is used when:

SupportsAdmissionControl

DeltaSourceBase is a SupportsAdmissionControl (Spark Structured Streaming).

Note

All the methods of SupportsAdmissionControl are in DeltaSource.