Skip to content

DeltaSource

DeltaSource is a DeltaSourceBase of the Delta Connector for streaming queries.

DeltaSource is a DeltaSourceCDCSupport.

Creating Instance

DeltaSource takes the following to be created:

DeltaSource is created when:

DeltaSourceMetadataTrackingLog

DeltaSource can be given a DeltaSourceMetadataTrackingLog when created. It is undefined (None) by default.

DeltaSourceMetadataTrackingLog is given when DeltaDataSource is requested for a MetadataTrackingLogForDeltaSource.

Streaming Micro-Batch DataFrame

Source
getBatch(
  start: Option[Offset],
  end: Offset): DataFrame

getBatch is part of the Source (Spark Structured Streaming) abstraction.

getBatch creates a DeltaSourceOffset for the tableId (aka reservoirId) and the given end offset.

getBatch determines the startVersion, startIndex, isStartingVersion and startSourceVersion based on the given startOffsetOption:

If undefined, getBatch getStartingVersion and does some computation.

If specified, getBatch creates a DeltaSourceOffset. Unless the DeltaSourceOffset is isStartingVersion, getBatch cleanUpSnapshotResources. getBatch uses the DeltaSourceOffset for the versions and the index.

getBatch prints out the following DEBUG message to the logs:

start: [startOffsetOption] end: [end]

In the end, getBatch createDataFrameBetweenOffsets (for the startVersion, startIndex, isStartingVersion and endOffset).

Latest Available Streaming Offset

SupportsAdmissionControl
latestOffset(
  startOffset: streaming.Offset,
  limit: ReadLimit): streaming.Offset

latestOffset is part of the SupportsAdmissionControl (Spark Structured Streaming) abstraction.

latestOffset determines the latest offset (currentOffset) based on whether the previousOffset internal registry is initialized or not.

latestOffset prints out the following DEBUG message to the logs (using the previousOffset internal registry).

previousOffset -> currentOffset: [previousOffset] -> [currentOffset]

In the end, latestOffset returns the previousOffset if defined or null.

No previousOffset

For no previousOffset, getOffset retrieves the starting offset (with a new AdmissionLimits for the given ReadLimit).

previousOffset Available

When the previousOffset is defined (which is when the DeltaSource is requested for another micro-batch), latestOffset gets the changes as an indexed AddFiles (with the previousOffset and a new AdmissionLimits for the given ReadLimit).

latestOffset takes the last AddFile if available.

With no AddFile, latestOffset returns the previousOffset.

With the previousOffset and the last indexed AddFile both available, latestOffset creates a new DeltaSourceOffset for the version, index, and isLast flag from the last indexed AddFile.

Note

isStartingVersion local value is enabled (true) when the following holds:

getStartingOffset

getStartingOffset(
  limits: Option[AdmissionLimits]): Option[Offset]

getStartingOffset...FIXME (review me)

getStartingOffset requests the DeltaLog for the version of the delta table (by requesting for the current state (snapshot) and then for the version).

getStartingOffset takes the last file from the files added (with rate limit) for the version of the delta table, -1L as the fromIndex, and the isStartingVersion flag enabled (true).

getStartingOffset returns a new DeltaSourceOffset for the tableId, the version and the index of the last file added, and whether the last file added is the last file of its version.

getStartingOffset returns None (offset not available) when either happens:

  • the version of the delta table is negative (below 0)

  • no files were added in the version

getStartingOffset throws an AssertionError when the version of the last file added is smaller than the delta table's version:

assertion failed: getChangesWithRateLimit returns an invalid version: [v] (expected: >= [version])

getChangesWithRateLimit

getChangesWithRateLimit(
  fromVersion: Long,
  fromIndex: Long,
  isStartingVersion: Boolean): Iterator[IndexedFile]

getChangesWithRateLimit gets the changes (as indexed AddFiles) for the given fromVersion, fromIndex, and isStartingVersion flag.

getOffset

getOffset: Option[Offset]

getOffset is part of the Source (Spark Structured Streaming) abstraction.

getOffset has been replaced by the newer latestOffset and so throws an UnsupportedOperationException when called:

latestOffset(Offset, ReadLimit) should be called instead of this method

Snapshot Management

DeltaSource uses internal registries for the DeltaSourceSnapshot and the version to avoid requesting the DeltaLog for getSnapshotAt.

Snapshot

DeltaSource uses initialState internal registry for the DeltaSourceSnapshot of the state of the delta table at the initialStateVersion.

DeltaSourceSnapshot is used for AddFiles of the delta table at a given version.

Initially uninitialized (null).

DeltaSourceSnapshot is created (initialized) when uninitialized or the version requested is different from the current one.

DeltaSourceSnapshot is closed and dereferenced (null) when DeltaSource is requested to cleanUpSnapshotResources (due to version change, another micro-batch or stop).

Version

DeltaSource uses initialStateVersion internal registry to keep track of the version of DeltaSourceSnapshot (when requested for AddFiles of the delta table at a given version).

Changes (alongside the initialState) to the version requested when DeltaSource is requested for the snapshot at a given version (only when the versions are different)

Used when:

Stopping

Source
stop(): Unit

stop is part of the Source (Spark Structured Streaming) abstraction.

stop simply cleanUpSnapshotResources.

Previous Offset

Ending DeltaSourceOffset of the latest micro-batch

Starts uninitialized (null).

Used when DeltaSource is requested for the latest available offset.

AddFiles of Delta Table at Given Version

getSnapshotAt(
  version: Long): Iterator[IndexedFile]

getSnapshotAt requests the DeltaSourceSnapshot for the data files (as indexed AddFiles).

In case the DeltaSourceSnapshot hasn't been initialized yet (null) or the requested version is different from the initialStateVersion, getSnapshotAt does the following:

  1. cleanUpSnapshotResources

  2. Requests the DeltaLog for the state (snapshot) of the delta table at the version

  3. Creates a new DeltaSourceSnapshot for the state (snapshot) as the current DeltaSourceSnapshot

  4. Changes the initialStateVersion internal registry to the requested version

getSnapshotAt is used when:

  • DeltaSource is requested to getChanges (with isStartingVersion flag enabled)

getChanges

getChanges(
  fromVersion: Long,
  fromIndex: Long,
  isStartingVersion: Boolean): Iterator[IndexedFile]

getChanges branches based on isStartingVersion flag (enabled or not):

Note

isStartingVersion flag simply adds the state (snapshot) before (filtered out) indexed AddFiles when enabled (true).

isStartingVersion flag is enabled when DeltaSource is requested for the following:

In the end, getChanges filters out (excludes) indexed AddFiles that are not with the version later than the given fromVersion or the index greater than the given fromIndex.

getChanges is used when:

filterAndIndexDeltaLogs

filterAndIndexDeltaLogs(
  startVersion: Long): Iterator[IndexedFile]

filterAndIndexDeltaLogs...FIXME

verifyStreamHygieneAndFilterAddFiles

verifyStreamHygieneAndFilterAddFiles(
  actions: Seq[Action],
  version: Long): Seq[Action]

verifyStreamHygieneAndFilterAddFiles...FIXME

cleanUpSnapshotResources

cleanUpSnapshotResources(): Unit

cleanUpSnapshotResources does the following when the initial DeltaSourceSnapshot internal registry is not empty:

Otherwise, cleanUpSnapshotResources does nothing.


cleanUpSnapshotResources is used when:

ReadLimit

SupportsAdmissionControl
getDefaultReadLimit: ReadLimit

getDefaultReadLimit is part of the SupportsAdmissionControl (Spark Structured Streaming) abstraction.

getDefaultReadLimit creates a AdmissionLimits and requests it for a corresponding ReadLimit.

Retrieving Last Element From Iterator

iteratorLast[T](
  iter: Iterator[T]): Option[T]

iteratorLast simply returns the last element of the given Iterator (Scala) or None.


iteratorLast is used when:

excludeRegex Option

excludeRegex: Option[Regex]

excludeRegex requests the DeltaOptions for the value of excludeRegex option.

Refactor It

excludeRegex should not really be part of DeltaSource (more of DeltaSourceBase) since it's used elsewhere anyway.


excludeRegex is used when:

Demo

val q = spark
  .readStream               // Creating a streaming query
  .format("delta")          // Using delta data source
  .load("/tmp/users") // Over data in a delta table
  .writeStream
  .format("memory")
  .option("queryName", "demo")
  .start
import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamingQueryWrapper}
val plan = q.asInstanceOf[StreamingQueryWrapper]
  .streamingQuery
  .asInstanceOf[MicroBatchExecution]
  .logicalPlan
import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation
val relation = plan.collect { case r: StreamingExecutionRelation => r }.head

import org.apache.spark.sql.delta.sources.DeltaSource
assert(relation.source.asInstanceOf[DeltaSource])

scala> println(relation.source)
DeltaSource[file:/tmp/users]

Logging

Enable ALL logging level for org.apache.spark.sql.delta.sources.DeltaSource logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.DeltaSource.name = org.apache.spark.sql.delta.sources.DeltaSource
logger.DeltaSource.level = all

Refer to Logging.