DeltaSource — Streaming Source of Delta Data Source

DeltaSource is the streaming source of delta data source for streaming queries in Spark Structured Streaming.

DeltaSource is created when DeltaDataSource is requested for a streaming source.

val q = spark
  .readStream               // Creating a streaming query
  .format("delta")          // Using delta data source
  .load("/tmp/delta/users") // Over data in a delta table
  .writeStream
  .format("memory")
  .option("queryName", "demo")
  .start
import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamingQueryWrapper}
val plan = q.asInstanceOf[StreamingQueryWrapper]
  .streamingQuery
  .asInstanceOf[MicroBatchExecution]
  .logicalPlan
import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation
val relation = plan.collect { case r: StreamingExecutionRelation => r }.head

import org.apache.spark.sql.delta.sources.DeltaSource
assert(relation.source.asInstanceOf[DeltaSource])

scala> println(relation.source)
DeltaSource[file:/tmp/delta/users]

DeltaSource uses maxFilesPerTrigger option to limit the number of files to process when requested for the file additions (with rate limit).

Enable ALL logging level for org.apache.spark.sql.delta.sources.DeltaSource logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.sources.DeltaSource=ALL

Refer to Logging.

Creating DeltaSource Instance

DeltaSource takes the following to be created:

DeltaSource initializes the internal properties.

Micro-Batch With Data Between Start And End Offsets (Streaming DataFrame) — getBatch Method

getBatch(
  start: Option[Offset],
  end: Offset): DataFrame
getBatch is part of the Source contract (Spark Structured Streaming) for a streaming DataFrame with data between the start and end offsets.

getBatch creates an DeltaSourceOffset for the tableId (aka reservoirId) and the given end offset.

getBatch gets the changes as follows…​FIXME

Latest Available Offset — getOffset Method

getOffset: Option[Offset]
getOffset is part of the Source abstraction (Spark Structured Streaming) for the latest available offset of this streaming source.

getOffset calculates the latest offset (that a streaming query can use for the data of the next micro-batch) based on the previousOffset internal registry.

For no ending offset of the latest micro-batch, getOffset simply retrieves the starting offset (based on the latest version of the delta table).

When the ending offset of the latest micro-batch is defined (which means that the DeltaSource is requested for another micro-batch), getOffset takes the last indexed AddFile from getChangesWithRateLimit for the previous ending offset. getOffset returns the previous ending offset when the last element was not available.

With the ending offset of the latest micro-batch and the last indexed AddFile both available, getOffset creates a new DeltaSourceOffset for the version, index, and isLast flag from the last indexed AddFile.

isStartingVersion local value is enabled (true) when the following holds:

In the end, getOffset prints out the following DEBUG message to the logs (using the previousOffset internal registry):

previousOffset -> currentOffset: [previousOffset] -> [currentOffset]

Stopping — stop Method

stop(): Unit
stop is part of the streaming Source contract (Spark Structured Streaming) to stop this source and free up any resources allocated.

Retrieving Starting Offset — getStartingOffset Internal Method

getStartingOffset(): Option[Offset]

getStartingOffset requests the DeltaLog for the version of the delta table (by requesting for the current state (snapshot) and then for the version).

getStartingOffset takes the last file from the files added (with rate limit) for the version of the delta table, -1L as the fromIndex, and the isStartingVersion flag enabled (true).

getStartingOffset returns a new DeltaSourceOffset for the tableId, the version and the index of the last file added, and whether the last file added is the last file of its version.

getStartingOffset returns None (offset not available) when either happens:

  • the version of the delta table is negative (below 0)

  • no files were added in the version

getStartingOffset throws an AssertionError when the version of the last file added is smaller than the delta table’s version:

assertion failed: getChangesWithRateLimit returns an invalid version: [v] (expected: >= [version])
getStartingOffset is used exclusively when DeltaSource is requested for the latest available offset.

getChanges Internal Method

getChanges(
  fromVersion: Long,
  fromIndex: Long,
  isStartingVersion: Boolean): Iterator[IndexedFile]

getChanges branches per the given isStartingVersion flag (enabled or not):

isStartingVersion flag simply adds the state (snapshot) before (filtered out) indexed AddFiles when enabled (true).

isStartingVersion flag is enabled when DeltaSource is requested for the following:

In the end, getChanges filters out (excludes) indexed AddFiles that are not with the version later than the given fromVersion or the index greater than the given fromIndex.

getChanges is used when DeltaSource is requested for the latest available offset (when requested for the files added (with rate limit)) and getBatch.

filterAndIndexDeltaLogs Internal Method

filterAndIndexDeltaLogs(
  startVersion: Long): Iterator[IndexedFile]

filterAndIndexDeltaLogs…​FIXME

Retrieving File Additions (With Rate Limit) — getChangesWithRateLimit Internal Method

getChangesWithRateLimit(
  fromVersion: Long,
  fromIndex: Long,
  isStartingVersion: Boolean): Iterator[IndexedFile]

getChangesWithRateLimit get the changes (as indexed AddFiles) for the given fromVersion, fromIndex, and isStartingVersion flag.

getChangesWithRateLimit takes the configured number of AddFiles (up to the maxFilesPerTrigger option (if defined) or 1000).

getChangesWithRateLimit is used when DeltaSource is requested for the latest available offset.

Retrieving State Of Delta Table At Given Version — getSnapshotAt Internal Method

getSnapshotAt(
  version: Long): Iterator[IndexedFile]

getSnapshotAt requests the DeltaSourceSnapshot for the data files (as indexed AddFiles).

In case the DeltaSourceSnapshot hasn’t been initialized yet (null) or the requested version is different from the initialStateVersion, getSnapshotAt does the following:

  1. cleanUpSnapshotResources

  2. Requests the DeltaLog for the state (snapshot) of the delta table at the version

  3. Creates a new DeltaSourceSnapshot for the state (snapshot) as the current DeltaSourceSnapshot

  4. Changes the initialStateVersion internal registry to the requested version

getSnapshotAt is used when DeltaSource is requested to getChanges (with isStartingVersion flag enabled).

verifyStreamHygieneAndFilterAddFiles Internal Method

verifyStreamHygieneAndFilterAddFiles(
  actions: Seq[Action]): Seq[Action]

verifyStreamHygieneAndFilterAddFiles…​FIXME

verifyStreamHygieneAndFilterAddFiles is used when DeltaSource is requested to getChanges.

cleanUpSnapshotResources Internal Method

cleanUpSnapshotResources(): Unit

cleanUpSnapshotResources…​FIXME

cleanUpSnapshotResources is used when DeltaSource is requested to getSnapshotAt, getBatch and stop.

Retrieving Last Element From Iterator — iteratorLast Internal Method

iteratorLast[T](
  iter: Iterator[T]): Option[T]

iteratorLast simply returns the last element in the given Iterator or None.

iteratorLast is used when DeltaSource is requested to getStartingOffset and getOffset.

Internal Properties

Name Description

initialState

Initially uninitialized (null).

Changes (along with the initialStateVersion) when DeltaSource is requested for the snapshot at a given version (only when the versions are different)

Used when DeltaSource is requested for the snapshot at a given version

Closed and dereferenced (null) when DeltaSource is requested to cleanUpSnapshotResources

initialStateVersion

Version of the delta table

Initially -1L and changes (along with the initialState) to the version requested when DeltaSource is requested for the snapshot at a given version (only when the versions are different)

Used when DeltaSource is requested to cleanUpSnapshotResources (and unpersist the current snapshot)

previousOffset

Ending DeltaSourceOffset of the latest micro-batch

Starts uninitialized (null).

Used when DeltaSource is requested for the latest available offset.

tableId

Table ID

Used when…​FIXME