DeltaSource¶
DeltaSource
is a DeltaSourceBase of the Delta Connector for streaming queries.
DeltaSource
is a DeltaSourceCDCSupport.
Creating Instance¶
DeltaSource
takes the following to be created:
-
SparkSession
(Spark SQL) - DeltaLog
- DeltaOptions
- SnapshotDescriptor
- Metadata Path
- DeltaSourceMetadataTrackingLog
- Filter
Expression
s (default: empty)
DeltaSource
is created when:
DeltaDataSource
is requested for a streaming source
DeltaSourceMetadataTrackingLog¶
DeltaSource
can be given a DeltaSourceMetadataTrackingLog when created. It is undefined (None
) by default.
DeltaSourceMetadataTrackingLog
is given when DeltaDataSource
is requested for a MetadataTrackingLogForDeltaSource.
Streaming Micro-Batch DataFrame¶
Source
getBatch(
start: Option[Offset],
end: Offset): DataFrame
getBatch
is part of the Source
(Spark Structured Streaming) abstraction.
getBatch
creates a DeltaSourceOffset for the tableId (aka reservoirId) and the given end
offset.
getBatch
determines the startVersion
, startIndex
, isStartingVersion
and startSourceVersion
based on the given startOffsetOption
:
If undefined, getBatch
getStartingVersion and does some computation.
If specified, getBatch
creates a DeltaSourceOffset. Unless the DeltaSourceOffset
is isStartingVersion, getBatch
cleanUpSnapshotResources. getBatch
uses the DeltaSourceOffset
for the versions and the index.
getBatch
prints out the following DEBUG message to the logs:
start: [startOffsetOption] end: [end]
In the end, getBatch
createDataFrameBetweenOffsets (for the startVersion
, startIndex
, isStartingVersion
and endOffset
).
Latest Available Streaming Offset¶
SupportsAdmissionControl
latestOffset(
startOffset: streaming.Offset,
limit: ReadLimit): streaming.Offset
latestOffset
is part of the SupportsAdmissionControl
(Spark Structured Streaming) abstraction.
latestOffset
determines the latest offset (currentOffset) based on whether the previousOffset internal registry is initialized or not.
latestOffset
prints out the following DEBUG message to the logs (using the previousOffset internal registry).
previousOffset -> currentOffset: [previousOffset] -> [currentOffset]
In the end, latestOffset
returns the previousOffset if defined or null
.
No previousOffset¶
For no previousOffset, getOffset
retrieves the starting offset (with a new AdmissionLimits for the given ReadLimit
).
previousOffset Available¶
When the previousOffset is defined (which is when the DeltaSource
is requested for another micro-batch), latestOffset
gets the changes as an indexed AddFiles (with the previousOffset and a new AdmissionLimits for the given ReadLimit
).
latestOffset
takes the last AddFile if available.
With no AddFile
, latestOffset
returns the previousOffset.
With the previousOffset and the last indexed AddFile both available, latestOffset
creates a new DeltaSourceOffset for the version, index, and isLast
flag from the last indexed AddFile.
Note
isStartingVersion
local value is enabled (true
) when the following holds:
-
Version of the last indexed AddFile is equal to the reservoirVersion of the previous ending offset
-
isStartingVersion flag of the previous ending offset is enabled (
true
)
getStartingOffset¶
getStartingOffset(
limits: Option[AdmissionLimits]): Option[Offset]
getStartingOffset
...FIXME (review me)
getStartingOffset
requests the DeltaLog for the version of the delta table (by requesting for the current state (snapshot) and then for the version).
getStartingOffset
takes the last file from the files added (with rate limit) for the version of the delta table, -1L
as the fromIndex
, and the isStartingVersion
flag enabled (true
).
getStartingOffset
returns a new DeltaSourceOffset for the tableId, the version and the index of the last file added, and whether the last file added is the last file of its version.
getStartingOffset
returns None
(offset not available) when either happens:
-
the version of the delta table is negative (below
0
) -
no files were added in the version
getStartingOffset
throws an AssertionError
when the version of the last file added is smaller than the delta table's version:
assertion failed: getChangesWithRateLimit returns an invalid version: [v] (expected: >= [version])
getChangesWithRateLimit¶
getChangesWithRateLimit(
fromVersion: Long,
fromIndex: Long,
isStartingVersion: Boolean): Iterator[IndexedFile]
getChangesWithRateLimit
gets the changes (as indexed AddFiles) for the given fromVersion
, fromIndex
, and isStartingVersion
flag.
getOffset¶
getOffset: Option[Offset]
getOffset
is part of the Source
(Spark Structured Streaming) abstraction.
getOffset
has been replaced by the newer latestOffset and so throws an UnsupportedOperationException
when called:
latestOffset(Offset, ReadLimit) should be called instead of this method
Snapshot Management¶
DeltaSource
uses internal registries for the DeltaSourceSnapshot and the version to avoid requesting the DeltaLog for getSnapshotAt.
Snapshot¶
DeltaSource
uses initialState
internal registry for the DeltaSourceSnapshot of the state of the delta table at the initialStateVersion.
DeltaSourceSnapshot
is used for AddFiles of the delta table at a given version.
Initially uninitialized (null
).
DeltaSourceSnapshot
is created (initialized) when uninitialized or the version requested is different from the current one.
DeltaSourceSnapshot
is closed and dereferenced (null
) when DeltaSource
is requested to cleanUpSnapshotResources (due to version change, another micro-batch or stop).
Version¶
DeltaSource
uses initialStateVersion
internal registry to keep track of the version of DeltaSourceSnapshot (when requested for AddFiles of the delta table at a given version).
Changes (alongside the initialState) to the version requested when DeltaSource
is requested for the snapshot at a given version (only when the versions are different)
Used when:
DeltaSource
is requested for AddFiles of the delta table at a given version and to cleanUpSnapshotResources (and unpersist the current snapshot)
Stopping¶
stop
simply cleanUpSnapshotResources.
Previous Offset¶
Ending DeltaSourceOffset of the latest micro-batch
Starts uninitialized (null
).
Used when DeltaSource
is requested for the latest available offset.
AddFiles of Delta Table at Given Version¶
getSnapshotAt(
version: Long): Iterator[IndexedFile]
getSnapshotAt
requests the DeltaSourceSnapshot for the data files (as indexed AddFiles).
In case the DeltaSourceSnapshot hasn't been initialized yet (null
) or the requested version is different from the initialStateVersion, getSnapshotAt
does the following:
-
Requests the DeltaLog for the state (snapshot) of the delta table at the version
-
Creates a new DeltaSourceSnapshot for the state (snapshot) as the current DeltaSourceSnapshot
-
Changes the initialStateVersion internal registry to the requested version
getSnapshotAt
is used when:
DeltaSource
is requested to getChanges (withisStartingVersion
flag enabled)
getChanges¶
getChanges(
fromVersion: Long,
fromIndex: Long,
isStartingVersion: Boolean): Iterator[IndexedFile]
getChanges
branches based on isStartingVersion
flag (enabled or not):
-
For
isStartingVersion
flag enabled (true
),getChanges
gets the state (snapshot) for the givenfromVersion
followed by (filtered out) indexed AddFiles for the next version after the givenfromVersion
-
For
isStartingVersion
flag disabled (false
),getChanges
simply gives (filtered out) indexed AddFiles for the givenfromVersion
Note
isStartingVersion
flag simply adds the state (snapshot) before (filtered out) indexed AddFiles when enabled (true
).
isStartingVersion
flag is enabled when DeltaSource
is requested for the following:
-
Micro-batch with data between start and end offsets and the start offset is not given or is for the starting version
-
Latest available offset with no end offset of the latest micro-batch or the end offset of the latest micro-batch for the starting version
In the end, getChanges
filters out (excludes) indexed AddFiles that are not with the version later than the given fromVersion
or the index greater than the given fromIndex
.
getChanges
is used when:
DeltaSource
is requested for the latest available offset (when requested for the files added (with rate limit)) and getBatch
filterAndIndexDeltaLogs¶
filterAndIndexDeltaLogs(
startVersion: Long): Iterator[IndexedFile]
filterAndIndexDeltaLogs
...FIXME
verifyStreamHygieneAndFilterAddFiles¶
verifyStreamHygieneAndFilterAddFiles(
actions: Seq[Action],
version: Long): Seq[Action]
verifyStreamHygieneAndFilterAddFiles
...FIXME
cleanUpSnapshotResources¶
cleanUpSnapshotResources(): Unit
cleanUpSnapshotResources
does the following when the initial DeltaSourceSnapshot internal registry is not empty:
- Requests the DeltaSourceSnapshot to close (with the
unpersistSnapshot
flag based on whether the initialStateVersion is earlier than the snapshot version) - Dereferences (nullifies) the DeltaSourceSnapshot
Otherwise, cleanUpSnapshotResources
does nothing.
cleanUpSnapshotResources
is used when:
DeltaSource
is requested to getSnapshotAt, getBatch and stop
ReadLimit¶
SupportsAdmissionControl
getDefaultReadLimit: ReadLimit
getDefaultReadLimit
is part of the SupportsAdmissionControl
(Spark Structured Streaming) abstraction.
getDefaultReadLimit
creates a AdmissionLimits and requests it for a corresponding ReadLimit.
Retrieving Last Element From Iterator¶
iteratorLast[T](
iter: Iterator[T]): Option[T]
iteratorLast
simply returns the last element of the given Iterator
(Scala) or None
.
iteratorLast
is used when:
DeltaSource
is requested to getStartingOffset and getOffset
excludeRegex Option¶
excludeRegex: Option[Regex]
excludeRegex
requests the DeltaOptions for the value of excludeRegex option.
Refactor It
excludeRegex
should not really be part of DeltaSource
(more of DeltaSourceBase) since it's used elsewhere anyway.
excludeRegex
is used when:
DeltaSourceBase
is requested to getFileChangesAndCreateDataFrameIndexedChangeFileSeq
(of DeltaSourceCDCSupport) is requested to isValidIndexedFile
Demo¶
val q = spark
.readStream // Creating a streaming query
.format("delta") // Using delta data source
.load("/tmp/users") // Over data in a delta table
.writeStream
.format("memory")
.option("queryName", "demo")
.start
import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamingQueryWrapper}
val plan = q.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
.asInstanceOf[MicroBatchExecution]
.logicalPlan
import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation
val relation = plan.collect { case r: StreamingExecutionRelation => r }.head
import org.apache.spark.sql.delta.sources.DeltaSource
assert(relation.source.asInstanceOf[DeltaSource])
scala> println(relation.source)
DeltaSource[file:/tmp/users]
Logging¶
Enable ALL
logging level for org.apache.spark.sql.delta.sources.DeltaSource
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.DeltaSource.name = org.apache.spark.sql.delta.sources.DeltaSource
logger.DeltaSource.level = all
Refer to Logging.