CDCReaderImpl¶
CDCReaderImpl
is an marker abstraction of Change Data Feed-aware Readers.
Fun Fact
Despite the suffix (Impl
), CDCReaderImpl
is a trait not an implementation (class).
Implementations¶
Creating CDF-Aware Relation¶
getCDCRelation(
spark: SparkSession,
snapshotToUse: Snapshot,
isTimeTravelQuery: Boolean,
conf: SQLConf,
options: CaseInsensitiveStringMap): BaseRelation
getCDCRelation
getVersionForCDC with the following:
- startingVersion for the version key
- startingTimestamp for the timestamp key
getCDCRelation
getBatchSchemaModeForTable.
getCDCRelation
getVersionForCDC with the following:
- endingVersion for the version key
- endingTimestamp for the timestamp key
getCDCRelation
prints out the following INFO message to the logs:
startingVersion: [startingVersion], endingVersion: [endingVersionOpt]
In the end, getCDCRelation
creates a DeltaCDFRelation.
getCDCRelation
is used when:
DeltaTableV2
is requested for the CDC-aware relation
Resolving Version¶
getVersionForCDC(
spark: SparkSession,
deltaLog: DeltaLog,
conf: SQLConf,
options: CaseInsensitiveStringMap,
versionKey: String,
timestampKey: String): Option[ResolvedCDFVersion]
FIXME Review Me
getVersionForCDC
uses the given options
map to get the value of the given versionKey
key, if available.
Otherwise, getVersionForCDC
uses the given options
map to get the value of the given timestampKey
key, if available. getVersionForCDC
...FIXME
If neither the given versionKey
nor the timestampKey
key is available in the options
map, getVersionForCDC
returns None
(undefined value).
Creating Batch DataFrame of Changes¶
changesToBatchDF(
deltaLog: DeltaLog,
start: Long,
end: Long,
spark: SparkSession,
readSchemaSnapshot: Option[Snapshot] = None,
useCoarseGrainedCDC: Boolean = false): DataFrame
changesToBatchDF
requests the given DeltaLog for the changes from the given start
version (inclusive) until the given end
version.
In the end, changesToBatchDF
creates a DataFrame of changes (with the changes per version and isStreaming
flag disabled).
changesToBatchDF
is used when:
DeltaCDFRelation
is requested to build a scan
Creating DataFrame of Changes¶
changesToDF(
readSchemaSnapshot: SnapshotDescriptor,
start: Long,
end: Long,
changes: Iterator[(Long, Seq[Action])],
spark: SparkSession,
isStreaming: Boolean = false,
useCoarseGrainedCDC: Boolean = false): CDCVersionDiffInfo
isStreaming
Input Argument
Value | Caller |
---|---|
false |
|
true | DeltaSourceCDCSupport is requested to getCDCFileChangesAndCreateDataFrame |
useCoarseGrainedCDC
Input Argument
useCoarseGrainedCDC
is disabled (false
) by default and for all the other known use cases.
changesToDF
getTimestampsByVersion.
changesToDF
requests the DeltaLog (of the given SnapshotDescriptor) for the Snapshot at the given start
.
changesToDF
asserts that one of the following is enabled (or throws a DeltaAnalysisException
):
- The given
useCoarseGrainedCDC
flag - isCDCEnabledOnTable
changesToDF
reads spark.databricks.delta.changeDataFeed.unsafeBatchReadOnIncompatibleSchemaChanges.enabled configuration property (that can potentially block batch reads, when the given isStreaming
flag is disabled).
changesToDF
...FIXME
In the end, changesToDF
creates a new CDCVersionDiffInfo (with the DataFrame
of the changes).
changesToDF
is used when:
CDCReaderImpl
is requested for a batch DataFrame of changesDeltaSourceCDCSupport
is requested for a streaming DataFrame of changes
getDeletedAndAddedRows¶
getDeletedAndAddedRows(
addFileSpecs: Seq[CDCDataSpec[AddFile]],
removeFileSpecs: Seq[CDCDataSpec[RemoveFile]],
deltaLog: DeltaLog,
snapshot: SnapshotDescriptor,
isStreaming: Boolean,
spark: SparkSession): Seq[DataFrame]
getDeletedAndAddedRows
...FIXME
buildCDCDataSpecSeq¶
buildCDCDataSpecSeq[T <: FileAction](
actionsByVersion: MutableMap[TableVersion, ListBuffer[T]],
versionToCommitInfo: MutableMap[Long, CommitInfo]): Seq[CDCDataSpec[T]]
buildCDCDataSpecSeq
converts the given actionsByVersion
into CDCDataSpecs (with CommitInfos from the given versionToCommitInfo
mapping).
processDeletionVectorActions¶
processDeletionVectorActions(
addFilesMap: Map[FilePathWithTableVersion, AddFile],
removeFilesMap: Map[FilePathWithTableVersion, RemoveFile],
versionToCommitInfo: Map[Long, CommitInfo],
deltaLog: DeltaLog,
snapshot: SnapshotDescriptor,
isStreaming: Boolean,
spark: SparkSession): Seq[DataFrame]
processDeletionVectorActions
...FIXME
generateFileActionsWithInlineDv¶
generateFileActionsWithInlineDv(
add: AddFile,
remove: RemoveFile,
dvStore: DeletionVectorStore,
deltaLog: DeltaLog): Seq[FileAction]
generateFileActionsWithInlineDv
...FIXME
Creating DataFrame over Delta-Aware FileIndex¶
scanIndex(
spark: SparkSession,
index: TahoeFileIndexWithSnapshotDescriptor,
isStreaming: Boolean = false): DataFrame
HadoopFsRelation
In order to understand the scanIndex
, it is firstly worth to understand the role of HadoopFsRelation
.
scanIndex
creates a HadoopFsRelation
(Spark SQL) based on the given TahoeFileIndexWithSnapshotDescriptor
as follows:
Property | Value |
---|---|
location | The given TahoeFileIndexWithSnapshotDescriptor |
partitionSchema | The partitionSchema of the given TahoeFileIndexWithSnapshotDescriptor |
dataSchema | The CDF-aware read schema based on the schema of the given TahoeFileIndexWithSnapshotDescriptor |
bucketSpec | Undefined (None ) |
fileFormat | A new DeltaParquetFileFormat |
options | The options of the DeltaLog of the given TahoeFileIndexWithSnapshotDescriptor |
scanIndex
creates a LogicalRelation
(Spark SQL) for the HadoopFsRelation
(and the given isStreaming
flag).
In the end, scanIndex
creates a DataFrame
for the LogicalRelation
.
scanIndex
is used when:
CDCReaderImpl
is requested to changesToDF, getDeletedAndAddedRows, processDeletionVectorActions
CDF-Aware Read Schema (Adding CDF Columns)¶
cdcReadSchema(
deltaSchema: StructType): StructType
cdcReadSchema
makes the given schema (StructType
) of a delta table CDF-aware by appending the following CDF metadata fields:
Column Name | Data Type |
---|---|
_change_type | StringType |
_commit_version | LongType |
_commit_timestamp | TimestampType |
cdcReadSchema
is used when:
OptimisticTransactionImpl
is requested to performCdcMetadataCheckDeltaCDFRelation
is requested for the schemaCDCReaderImpl
is requested to changesToDF, scanIndexCdcAddFileIndex
is requested for the partition schemaTahoeRemoveFileIndex
is requested for the partition schemaDeltaDataSource
is requested for the sourceSchemaDeltaSourceBase
is requested to checkReadIncompatibleSchemaChanges and for the schema
CDC-Aware Table Scan (CDC Read)¶
isCDCRead(
options: CaseInsensitiveStringMap): Boolean
isCDCRead
is true
when one of the following options is specified (in the given options
) with true
value (case-insensitive):
- readChangeFeed
- (legacy) readChangeData
Otherwise, isCDCRead
is false
.
isCDCRead
is used when:
DeltaRelation
utility is used to fromV2RelationDeltaTableV2
is requested for the cdcRelation, initialSnapshot, withOptionsDeltaDataSource
is requested for the streaming source schema and for a relation
isCDCEnabledOnTable¶
isCDCEnabledOnTable(
metadata: Metadata,
spark: SparkSession): Boolean
isCDCEnabledOnTable
checks if the given metadata requires the Change Data Feed feature to be enabled (based on delta.enableChangeDataFeed table property).
isCDCEnabledOnTable
is used when:
OptimisticTransactionImpl
is requested to performCdcColumnMappingCheck and performCdcMetadataCheckWriteIntoDelta
is requested to write data outCDCReaderImpl
is requested to create a DataFrame of changesTransactionalWrite
is requested to performCDCPartition
Logging¶
CDCReaderImpl
is an abstract class and logging is configured using the logger of the implementations.