CDCReaderImpl¶
CDCReaderImpl is an marker abstraction of Change Data Feed-aware Readers.
Fun Fact
Despite the suffix (Impl), CDCReaderImpl is a trait not an implementation (class).
Implementations¶
Creating CDF-Aware Relation¶
getCDCRelation(
spark: SparkSession,
snapshotToUse: Snapshot,
isTimeTravelQuery: Boolean,
conf: SQLConf,
options: CaseInsensitiveStringMap): BaseRelation
getCDCRelation getVersionForCDC with the following:
- startingVersion for the version key
- startingTimestamp for the timestamp key
getCDCRelation getBatchSchemaModeForTable.
getCDCRelation getVersionForCDC with the following:
- endingVersion for the version key
- endingTimestamp for the timestamp key
getCDCRelation prints out the following INFO message to the logs:
startingVersion: [startingVersion], endingVersion: [endingVersionOpt]
In the end, getCDCRelation creates a DeltaCDFRelation.
getCDCRelation is used when:
DeltaTableV2is requested for the CDC-aware relation
Resolving Version¶
getVersionForCDC(
spark: SparkSession,
deltaLog: DeltaLog,
conf: SQLConf,
options: CaseInsensitiveStringMap,
versionKey: String,
timestampKey: String): Option[ResolvedCDFVersion]
FIXME Review Me
getVersionForCDC uses the given options map to get the value of the given versionKey key, if available.
Otherwise, getVersionForCDC uses the given options map to get the value of the given timestampKey key, if available. getVersionForCDC...FIXME
If neither the given versionKey nor the timestampKey key is available in the options map, getVersionForCDC returns None (undefined value).
Creating Batch DataFrame of Changes¶
changesToBatchDF(
deltaLog: DeltaLog,
start: Long,
end: Long,
spark: SparkSession,
readSchemaSnapshot: Option[Snapshot] = None,
useCoarseGrainedCDC: Boolean = false): DataFrame
changesToBatchDF requests the given DeltaLog for the changes from the given start version (inclusive) until the given end version.
In the end, changesToBatchDF creates a DataFrame of changes (with the changes per version and isStreaming flag disabled).
changesToBatchDF is used when:
DeltaCDFRelationis requested to build a scan
Creating DataFrame of Changes¶
changesToDF(
readSchemaSnapshot: SnapshotDescriptor,
start: Long,
end: Long,
changes: Iterator[(Long, Seq[Action])],
spark: SparkSession,
isStreaming: Boolean = false,
useCoarseGrainedCDC: Boolean = false): CDCVersionDiffInfo
isStreaming Input Argument
| Value | Caller |
|---|---|
false |
|
true | DeltaSourceCDCSupport is requested to getCDCFileChangesAndCreateDataFrame |
useCoarseGrainedCDC Input Argument
useCoarseGrainedCDC is disabled (false) by default and for all the other known use cases.
changesToDF getTimestampsByVersion.
changesToDF requests the DeltaLog (of the given SnapshotDescriptor) for the Snapshot at the given start.
changesToDF asserts that one of the following is enabled (or throws a DeltaAnalysisException):
- The given
useCoarseGrainedCDCflag - isCDCEnabledOnTable
changesToDF reads spark.databricks.delta.changeDataFeed.unsafeBatchReadOnIncompatibleSchemaChanges.enabled configuration property (that can potentially block batch reads, when the given isStreaming flag is disabled).
changesToDF...FIXME
In the end, changesToDF creates a new CDCVersionDiffInfo (with the DataFrame of the changes).
changesToDF is used when:
CDCReaderImplis requested for a batch DataFrame of changesDeltaSourceCDCSupportis requested for a streaming DataFrame of changes
getDeletedAndAddedRows¶
getDeletedAndAddedRows(
addFileSpecs: Seq[CDCDataSpec[AddFile]],
removeFileSpecs: Seq[CDCDataSpec[RemoveFile]],
deltaLog: DeltaLog,
snapshot: SnapshotDescriptor,
isStreaming: Boolean,
spark: SparkSession): Seq[DataFrame]
getDeletedAndAddedRows...FIXME
buildCDCDataSpecSeq¶
buildCDCDataSpecSeq[T <: FileAction](
actionsByVersion: MutableMap[TableVersion, ListBuffer[T]],
versionToCommitInfo: MutableMap[Long, CommitInfo]): Seq[CDCDataSpec[T]]
buildCDCDataSpecSeq converts the given actionsByVersion into CDCDataSpecs (with CommitInfos from the given versionToCommitInfo mapping).
processDeletionVectorActions¶
processDeletionVectorActions(
addFilesMap: Map[FilePathWithTableVersion, AddFile],
removeFilesMap: Map[FilePathWithTableVersion, RemoveFile],
versionToCommitInfo: Map[Long, CommitInfo],
deltaLog: DeltaLog,
snapshot: SnapshotDescriptor,
isStreaming: Boolean,
spark: SparkSession): Seq[DataFrame]
processDeletionVectorActions...FIXME
generateFileActionsWithInlineDv¶
generateFileActionsWithInlineDv(
add: AddFile,
remove: RemoveFile,
dvStore: DeletionVectorStore,
deltaLog: DeltaLog): Seq[FileAction]
generateFileActionsWithInlineDv...FIXME
Creating DataFrame over Delta-Aware FileIndex¶
scanIndex(
spark: SparkSession,
index: TahoeFileIndexWithSnapshotDescriptor,
isStreaming: Boolean = false): DataFrame
HadoopFsRelation
In order to understand the scanIndex, it is firstly worth to understand the role of HadoopFsRelation.
scanIndex creates a HadoopFsRelation (Spark SQL) based on the given TahoeFileIndexWithSnapshotDescriptor as follows:
| Property | Value |
|---|---|
location | The given TahoeFileIndexWithSnapshotDescriptor |
partitionSchema | The partitionSchema of the given TahoeFileIndexWithSnapshotDescriptor |
dataSchema | The CDF-aware read schema based on the schema of the given TahoeFileIndexWithSnapshotDescriptor |
bucketSpec | Undefined (None) |
fileFormat | A new DeltaParquetFileFormat |
options | The options of the DeltaLog of the given TahoeFileIndexWithSnapshotDescriptor |
scanIndex creates a LogicalRelation (Spark SQL) for the HadoopFsRelation (and the given isStreaming flag).
In the end, scanIndex creates a DataFrame for the LogicalRelation.
scanIndex is used when:
CDCReaderImplis requested to changesToDF, getDeletedAndAddedRows, processDeletionVectorActions
CDF-Aware Read Schema (Adding CDF Columns)¶
cdcReadSchema(
deltaSchema: StructType): StructType
cdcReadSchema makes the given schema (StructType) of a delta table CDF-aware by appending the following CDF metadata fields:
| Column Name | Data Type |
|---|---|
| _change_type | StringType |
| _commit_version | LongType |
| _commit_timestamp | TimestampType |
cdcReadSchema is used when:
OptimisticTransactionImplis requested to performCdcMetadataCheckDeltaCDFRelationis requested for the schemaCDCReaderImplis requested to changesToDF, scanIndexCdcAddFileIndexis requested for the partition schemaTahoeRemoveFileIndexis requested for the partition schemaDeltaDataSourceis requested for the sourceSchemaDeltaSourceBaseis requested to checkReadIncompatibleSchemaChanges and for the schema
CDC-Aware Table Scan (CDC Read)¶
isCDCRead(
options: CaseInsensitiveStringMap): Boolean
isCDCRead is true when one of the following options is specified (in the given options) with true value (case-insensitive):
- readChangeFeed
- (legacy) readChangeData
Otherwise, isCDCRead is false.
isCDCRead is used when:
DeltaRelationutility is used to fromV2RelationDeltaTableV2is requested for the cdcRelation, initialSnapshot, withOptionsDeltaDataSourceis requested for the streaming source schema and for a relation
isCDCEnabledOnTable¶
isCDCEnabledOnTable(
metadata: Metadata,
spark: SparkSession): Boolean
isCDCEnabledOnTable checks if the given metadata requires the Change Data Feed feature to be enabled (based on delta.enableChangeDataFeed table property).
isCDCEnabledOnTable is used when:
OptimisticTransactionImplis requested to performCdcColumnMappingCheck and performCdcMetadataCheckWriteIntoDeltais requested to write data outCDCReaderImplis requested to create a DataFrame of changesTransactionalWriteis requested to performCDCPartition
Logging¶
CDCReaderImpl is an abstract class and logging is configured using the logger of the implementations.