DeltaSource — Streaming Source of Delta Data Source¶
DeltaSource
is the streaming source of <
TIP: Read up on https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-Source.html[Streaming Source] in https://bit.ly/spark-structured-streaming[The Internals of Spark Structured Streaming] online book.
DeltaSource
is <DeltaDataSource
is requested for a <
val q = spark
.readStream // Creating a streaming query
.format("delta") // Using delta data source
.load("/tmp/delta/users") // Over data in a delta table
.writeStream
.format("memory")
.option("queryName", "demo")
.start
import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamingQueryWrapper}
val plan = q.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
.asInstanceOf[MicroBatchExecution]
.logicalPlan
import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation
val relation = plan.collect { case r: StreamingExecutionRelation => r }.head
import org.apache.spark.sql.delta.sources.DeltaSource
assert(relation.source.asInstanceOf[DeltaSource])
scala> println(relation.source)
DeltaSource[file:/tmp/delta/users]
[[maxFilesPerTrigger]] DeltaSource
uses <
[[logging]] [TIP] ==== Enable ALL
logging level for org.apache.spark.sql.delta.sources.DeltaSource
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.delta.sources.DeltaSource=ALL
Refer to Logging.¶
== [[creating-instance]] Creating DeltaSource Instance
DeltaSource
takes the following to be created:
- [[spark]]
SparkSession
- [[deltaLog]] <
> of the delta table to read data (as < >) from - [[options]] <
> - [[filters]] Filter expressions (default: no filters)
DeltaSource
initializes the <
== [[getBatch]] Micro-Batch With Data Between Start And End Offsets (Streaming DataFrame) -- getBatch
Method
[source, scala]¶
getBatch( start: Option[Offset], end: Offset): DataFrame
NOTE: getBatch
is part of the Source
contract (https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-Source.html[Spark Structured Streaming]) for a streaming DataFrame
with data between the start and end offsets.
getBatch
creates an <end
offset.
getBatch
<
== [[getOffset]] Latest Available Offset -- getOffset
Method
[source, scala]¶
getOffset: Option[Offset]¶
NOTE: getOffset
is part of the Source
abstraction (https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-Source.html[Spark Structured Streaming]) for the latest available offset of this streaming source.
[[getOffset-currentOffset]] getOffset
calculates the latest offset (that a streaming query can use for the data of the next micro-batch) based on the <
For no <getOffset
simply <
When the <DeltaSource
is requested for another micro-batch), getOffset
takes the <getOffset
returns the <
With the <getOffset
creates a new <isLast
flag from the last indexed <
[NOTE]¶
isStartingVersion
local value is enabled (true
) when the following holds:
- Version of the last indexed <
> is equal to the < > of the < >
* <> flag of the <> is enabled (true
)¶
true
)¶In the end, getOffset
prints out the following DEBUG message to the logs (using the <
previousOffset -> currentOffset: [previousOffset] -> [currentOffset]
== [[stop]] Stopping -- stop
Method
[source, scala]¶
stop(): Unit¶
NOTE: stop
is part of the streaming Source
contract (https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-Source.html[Spark Structured Streaming]) to stop this source and free up any resources allocated.
stop
simply <
== [[getStartingOffset]] Retrieving Starting Offset -- getStartingOffset
Internal Method
[source, scala]¶
getStartingOffset(): Option[Offset]¶
getStartingOffset
requests the <
getStartingOffset
<-1L
as the fromIndex
, and the isStartingVersion
flag enabled (true
).
getStartingOffset
returns a new <
getStartingOffset
returns None
(offset not available) when either happens:
-
the version of the delta table is negative (below
0
) -
no files were added in the version
getStartingOffset
throws an AssertionError
when the version of the last file added is smaller than the delta table's version:
assertion failed: getChangesWithRateLimit returns an invalid version: [v] (expected: >= [version])
NOTE: getStartingOffset
is used exclusively when DeltaSource
is requested for the <
== [[getChanges]] getChanges
Internal Method
[source, scala]¶
getChanges( fromVersion: Long, fromIndex: Long, isStartingVersion: Boolean): Iterator[IndexedFile]
getChanges
branches per the given isStartingVersion
flag (enabled or not):
-
For
isStartingVersion
flag enabled (true
),getChanges
<> for the given fromVersion
followed by <> for the next version after the given fromVersion
-
For
isStartingVersion
flag disabled (false
),getChanges
simply gives <> for the given fromVersion
[NOTE]¶
isStartingVersion
flag simply adds <true
).
isStartingVersion
flag is enabled when DeltaSource
is requested for the following:
- <
> and the start offset is not given or is for the < >
* <> with no <> or the <> for the <>¶
In the end, getChanges
filters out (excludes) indexed <fromVersion
or the index greater than the given fromIndex
.
NOTE: getChanges
is used when DeltaSource
is requested for the <
=== [[getChanges-filterAndIndexDeltaLogs]] filterAndIndexDeltaLogs
Internal Method
[source, scala]¶
filterAndIndexDeltaLogs( startVersion: Long): Iterator[IndexedFile]
filterAndIndexDeltaLogs
...FIXME
== [[getChangesWithRateLimit]] Retrieving File Additions (With Rate Limit) -- getChangesWithRateLimit
Internal Method
[source, scala]¶
getChangesWithRateLimit( fromVersion: Long, fromIndex: Long, isStartingVersion: Boolean): Iterator[IndexedFile]
getChangesWithRateLimit
<fromVersion
, fromIndex
, and isStartingVersion
flag.
getChangesWithRateLimit
takes the configured number of AddFiles
(up to the <
NOTE: getChangesWithRateLimit
is used when DeltaSource
is requested for the <
== [[getSnapshotAt]] Retrieving State Of Delta Table At Given Version -- getSnapshotAt
Internal Method
[source, scala]¶
getSnapshotAt( version: Long): Iterator[IndexedFile]
getSnapshotAt
requests the <
In case the <null
) or the requested version is different from the <getSnapshotAt
does the following:
. <
. Requests the <
. Creates a new <
. Changes the <
NOTE: getSnapshotAt
is used when DeltaSource
is requested to <isStartingVersion
flag enabled).
== [[verifyStreamHygieneAndFilterAddFiles]] verifyStreamHygieneAndFilterAddFiles
Internal Method
[source, scala]¶
verifyStreamHygieneAndFilterAddFiles( actions: Seq[Action]): Seq[Action]
verifyStreamHygieneAndFilterAddFiles
...FIXME
NOTE: verifyStreamHygieneAndFilterAddFiles
is used when DeltaSource
is requested to <
== [[cleanUpSnapshotResources]] cleanUpSnapshotResources
Internal Method
[source, scala]¶
cleanUpSnapshotResources(): Unit¶
cleanUpSnapshotResources
...FIXME
NOTE: cleanUpSnapshotResources
is used when DeltaSource
is requested to <
== [[iteratorLast]] Retrieving Last Element From Iterator -- iteratorLast
Internal Method
[source, scala]¶
iteratorLastT: Option[T]
iteratorLast
simply returns the last element in the given Iterator
or None
.
NOTE: iteratorLast
is used when DeltaSource
is requested to <
== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| initialState a| [[initialState]] <
Initially uninitialized (null
).
Changes (along with the <DeltaSource
is requested for the <
Used when DeltaSource
is requested for the <
Closed and dereferenced (null
) when DeltaSource
is requested to <
| initialStateVersion a| [[initialStateVersion]] Version of the <
Initially -1L
and changes (along with the <DeltaSource
is requested for the <
Used when DeltaSource
is requested to <
| previousOffset a| [[previousOffset]] Ending <
Starts uninitialized (null
).
Used when DeltaSource
is requested for the <
| tableId a| [[tableId]] Table ID
Used when...FIXME
|===