Skip to content

DeltaSource — Streaming Source of Delta Data Source

DeltaSource is the streaming source of <> for streaming queries in Spark Structured Streaming.

TIP: Read up on https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-Source.html[Streaming Source] in https://bit.ly/spark-structured-streaming[The Internals of Spark Structured Streaming] online book.

DeltaSource is <> when DeltaDataSource is requested for a <>.

val q = spark
  .readStream               // Creating a streaming query
  .format("delta")          // Using delta data source
  .load("/tmp/delta/users") // Over data in a delta table
  .writeStream
  .format("memory")
  .option("queryName", "demo")
  .start
import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamingQueryWrapper}
val plan = q.asInstanceOf[StreamingQueryWrapper]
  .streamingQuery
  .asInstanceOf[MicroBatchExecution]
  .logicalPlan
import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation
val relation = plan.collect { case r: StreamingExecutionRelation => r }.head

import org.apache.spark.sql.delta.sources.DeltaSource
assert(relation.source.asInstanceOf[DeltaSource])

scala> println(relation.source)
DeltaSource[file:/tmp/delta/users]

[[maxFilesPerTrigger]] DeltaSource uses <> option to limit the number of files to process when requested for the <>.

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.delta.sources.DeltaSource logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.sources.DeltaSource=ALL

Refer to Logging.

== [[creating-instance]] Creating DeltaSource Instance

DeltaSource takes the following to be created:

  • [[spark]] SparkSession
  • [[deltaLog]] <> of the delta table to read data (as <>) from
  • [[options]] <>
  • [[filters]] Filter expressions (default: no filters)

DeltaSource initializes the <>.

== [[getBatch]] Micro-Batch With Data Between Start And End Offsets (Streaming DataFrame) -- getBatch Method

[source, scala]

getBatch( start: Option[Offset], end: Offset): DataFrame


NOTE: getBatch is part of the Source contract (https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-Source.html[Spark Structured Streaming]) for a streaming DataFrame with data between the start and end offsets.

getBatch creates an <> for the <> (aka <>) and the given end offset.

getBatch <> as follows...FIXME

== [[getOffset]] Latest Available Offset -- getOffset Method

[source, scala]

getOffset: Option[Offset]

NOTE: getOffset is part of the Source abstraction (https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-Source.html[Spark Structured Streaming]) for the latest available offset of this streaming source.

[[getOffset-currentOffset]] getOffset calculates the latest offset (that a streaming query can use for the data of the next micro-batch) based on the <> internal registry.

For no <>, getOffset simply <> (based on the latest version of the delta table).

When the <> is defined (which means that the DeltaSource is requested for another micro-batch), getOffset takes the <> from <> for the <>. getOffset returns the <> when the last element was not available.

With the <> and the <> both available, getOffset creates a new <> for the version, index, and isLast flag from the last indexed <>.

[NOTE]

isStartingVersion local value is enabled (true) when the following holds:

  • Version of the last indexed <> is equal to the <> of the <>

* <> flag of the <> is enabled (true)

In the end, getOffset prints out the following DEBUG message to the logs (using the <> internal registry):

previousOffset -> currentOffset: [previousOffset] -> [currentOffset]

== [[stop]] Stopping -- stop Method

[source, scala]

stop(): Unit

NOTE: stop is part of the streaming Source contract (https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-Source.html[Spark Structured Streaming]) to stop this source and free up any resources allocated.

stop simply <>.

== [[getStartingOffset]] Retrieving Starting Offset -- getStartingOffset Internal Method

[source, scala]

getStartingOffset(): Option[Offset]

getStartingOffset requests the <> for the version of the delta table (by requesting for the <> and then for the <>).

getStartingOffset <> from the <> for the version of the delta table, -1L as the fromIndex, and the isStartingVersion flag enabled (true).

getStartingOffset returns a new <> for the <>, the version and the index of the last file added, and whether the last file added is the last file of its version.

getStartingOffset returns None (offset not available) when either happens:

  • the version of the delta table is negative (below 0)

  • no files were added in the version

getStartingOffset throws an AssertionError when the version of the last file added is smaller than the delta table's version:

assertion failed: getChangesWithRateLimit returns an invalid version: [v] (expected: >= [version])

NOTE: getStartingOffset is used exclusively when DeltaSource is requested for the <>.

== [[getChanges]] getChanges Internal Method

[source, scala]

getChanges( fromVersion: Long, fromIndex: Long, isStartingVersion: Boolean): Iterator[IndexedFile]


getChanges branches per the given isStartingVersion flag (enabled or not):

  • For isStartingVersion flag enabled (true), getChanges <> for the given fromVersion followed by <> for the next version after the given fromVersion

  • For isStartingVersion flag disabled (false), getChanges simply gives <> for the given fromVersion

[NOTE]

isStartingVersion flag simply adds <> before <> when enabled (true).

isStartingVersion flag is enabled when DeltaSource is requested for the following:

  • <> and the start offset is not given or is for the <>

* <> with no <> or the <> for the <>

In the end, getChanges filters out (excludes) indexed <> that are not with the version later than the given fromVersion or the index greater than the given fromIndex.

NOTE: getChanges is used when DeltaSource is requested for the <> (when requested for the <>) and <>.

=== [[getChanges-filterAndIndexDeltaLogs]] filterAndIndexDeltaLogs Internal Method

[source, scala]

filterAndIndexDeltaLogs( startVersion: Long): Iterator[IndexedFile]


filterAndIndexDeltaLogs...FIXME

== [[getChangesWithRateLimit]] Retrieving File Additions (With Rate Limit) -- getChangesWithRateLimit Internal Method

[source, scala]

getChangesWithRateLimit( fromVersion: Long, fromIndex: Long, isStartingVersion: Boolean): Iterator[IndexedFile]


getChangesWithRateLimit <> (as indexed <>) for the given fromVersion, fromIndex, and isStartingVersion flag.

getChangesWithRateLimit takes the configured number of AddFiles (up to the <> option (if defined) or <>).

NOTE: getChangesWithRateLimit is used when DeltaSource is requested for the <>.

== [[getSnapshotAt]] Retrieving State Of Delta Table At Given Version -- getSnapshotAt Internal Method

[source, scala]

getSnapshotAt( version: Long): Iterator[IndexedFile]


getSnapshotAt requests the <> for the <> (as indexed <>).

In case the <> hasn't been initialized yet (null) or the requested version is different from the <>, getSnapshotAt does the following:

. <>

. Requests the <> for the <> at the version

. Creates a new <> for the state (snapshot) as the current <>

. Changes the <> internal registry to the requested version

NOTE: getSnapshotAt is used when DeltaSource is requested to <> (with isStartingVersion flag enabled).

== [[verifyStreamHygieneAndFilterAddFiles]] verifyStreamHygieneAndFilterAddFiles Internal Method

[source, scala]

verifyStreamHygieneAndFilterAddFiles( actions: Seq[Action]): Seq[Action]


verifyStreamHygieneAndFilterAddFiles...FIXME

NOTE: verifyStreamHygieneAndFilterAddFiles is used when DeltaSource is requested to <>.

== [[cleanUpSnapshotResources]] cleanUpSnapshotResources Internal Method

[source, scala]

cleanUpSnapshotResources(): Unit

cleanUpSnapshotResources...FIXME

NOTE: cleanUpSnapshotResources is used when DeltaSource is requested to <>, <> and <>.

== [[iteratorLast]] Retrieving Last Element From Iterator -- iteratorLast Internal Method

[source, scala]

iteratorLastT: Option[T]


iteratorLast simply returns the last element in the given Iterator or None.

NOTE: iteratorLast is used when DeltaSource is requested to <> and <>.

== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| initialState a| [[initialState]] <>

Initially uninitialized (null).

Changes (along with the <>) when DeltaSource is requested for the <> (only when the versions are different)

Used when DeltaSource is requested for the <>

Closed and dereferenced (null) when DeltaSource is requested to <>

| initialStateVersion a| [[initialStateVersion]] Version of the <>

Initially -1L and changes (along with the <>) to the version requested when DeltaSource is requested for the <> (only when the versions are different)

Used when DeltaSource is requested to <> (and unpersist the current snapshot)

| previousOffset a| [[previousOffset]] Ending <> of the latest <>

Starts uninitialized (null).

Used when DeltaSource is requested for the <>.

| tableId a| [[tableId]] Table ID

Used when...FIXME

|===


Last update: 2020-09-27