Skip to content

CDCReaderImpl

CDCReaderImpl is an marker abstraction of Change Data Feed-aware Readers.

Fun Fact

Despite the suffix (Impl), CDCReaderImpl is a trait not an implementation (class).

Implementations

Creating CDF-Aware Relation

getCDCRelation(
  spark: SparkSession,
  snapshotToUse: Snapshot,
  isTimeTravelQuery: Boolean,
  conf: SQLConf,
  options: CaseInsensitiveStringMap): BaseRelation

getCDCRelation getVersionForCDC with the following:

getCDCRelation getBatchSchemaModeForTable.

getCDCRelation getVersionForCDC with the following:

getCDCRelation prints out the following INFO message to the logs:

startingVersion: [startingVersion], endingVersion: [endingVersionOpt]

In the end, getCDCRelation creates a DeltaCDFRelation.


getCDCRelation is used when:

Resolving Version

getVersionForCDC(
  spark: SparkSession,
  deltaLog: DeltaLog,
  conf: SQLConf,
  options: CaseInsensitiveStringMap,
  versionKey: String,
  timestampKey: String): Option[ResolvedCDFVersion]

FIXME Review Me

getVersionForCDC uses the given options map to get the value of the given versionKey key, if available.

Otherwise, getVersionForCDC uses the given options map to get the value of the given timestampKey key, if available. getVersionForCDC...FIXME

If neither the given versionKey nor the timestampKey key is available in the options map, getVersionForCDC returns None (undefined value).

Creating Batch DataFrame of Changes

changesToBatchDF(
  deltaLog: DeltaLog,
  start: Long,
  end: Long,
  spark: SparkSession,
  readSchemaSnapshot: Option[Snapshot] = None,
  useCoarseGrainedCDC: Boolean = false): DataFrame

changesToBatchDF requests the given DeltaLog for the changes from the given start version (inclusive) until the given end version.

In the end, changesToBatchDF creates a DataFrame of changes (with the changes per version and isStreaming flag disabled).


changesToBatchDF is used when:

Creating DataFrame of Changes

changesToDF(
  readSchemaSnapshot: SnapshotDescriptor,
  start: Long,
  end: Long,
  changes: Iterator[(Long, Seq[Action])],
  spark: SparkSession,
  isStreaming: Boolean = false,
  useCoarseGrainedCDC: Boolean = false): CDCVersionDiffInfo
isStreaming Input Argument
Value Caller
false
true DeltaSourceCDCSupport is requested to getCDCFileChangesAndCreateDataFrame
useCoarseGrainedCDC Input Argument

useCoarseGrainedCDC is disabled (false) by default and for all the other known use cases.

changesToDF getTimestampsByVersion.

changesToDF requests the DeltaLog (of the given SnapshotDescriptor) for the Snapshot at the given start.

changesToDF asserts that one of the following is enabled (or throws a DeltaAnalysisException):

changesToDF reads spark.databricks.delta.changeDataFeed.unsafeBatchReadOnIncompatibleSchemaChanges.enabled configuration property (that can potentially block batch reads, when the given isStreaming flag is disabled).

changesToDF...FIXME

In the end, changesToDF creates a new CDCVersionDiffInfo (with the DataFrame of the changes).


changesToDF is used when:

getDeletedAndAddedRows

getDeletedAndAddedRows(
  addFileSpecs: Seq[CDCDataSpec[AddFile]],
  removeFileSpecs: Seq[CDCDataSpec[RemoveFile]],
  deltaLog: DeltaLog,
  snapshot: SnapshotDescriptor,
  isStreaming: Boolean,
  spark: SparkSession): Seq[DataFrame]

getDeletedAndAddedRows...FIXME

buildCDCDataSpecSeq

buildCDCDataSpecSeq[T <: FileAction](
  actionsByVersion: MutableMap[TableVersion, ListBuffer[T]],
  versionToCommitInfo: MutableMap[Long, CommitInfo]): Seq[CDCDataSpec[T]]

buildCDCDataSpecSeq converts the given actionsByVersion into CDCDataSpecs (with CommitInfos from the given versionToCommitInfo mapping).

processDeletionVectorActions

processDeletionVectorActions(
  addFilesMap: Map[FilePathWithTableVersion, AddFile],
  removeFilesMap: Map[FilePathWithTableVersion, RemoveFile],
  versionToCommitInfo: Map[Long, CommitInfo],
  deltaLog: DeltaLog,
  snapshot: SnapshotDescriptor,
  isStreaming: Boolean,
  spark: SparkSession): Seq[DataFrame]

processDeletionVectorActions...FIXME

generateFileActionsWithInlineDv

generateFileActionsWithInlineDv(
  add: AddFile,
  remove: RemoveFile,
  dvStore: DeletionVectorStore,
  deltaLog: DeltaLog): Seq[FileAction]

generateFileActionsWithInlineDv...FIXME

Creating DataFrame over Delta-Aware FileIndex

scanIndex(
  spark: SparkSession,
  index: TahoeFileIndexWithSnapshotDescriptor,
  isStreaming: Boolean = false): DataFrame

HadoopFsRelation

In order to understand the scanIndex, it is firstly worth to understand the role of HadoopFsRelation.

scanIndex creates a HadoopFsRelation (Spark SQL) based on the given TahoeFileIndexWithSnapshotDescriptor as follows:

Property Value
location The given TahoeFileIndexWithSnapshotDescriptor
partitionSchema The partitionSchema of the given TahoeFileIndexWithSnapshotDescriptor
dataSchema The CDF-aware read schema based on the schema of the given TahoeFileIndexWithSnapshotDescriptor
bucketSpec Undefined (None)
fileFormat A new DeltaParquetFileFormat
options The options of the DeltaLog of the given TahoeFileIndexWithSnapshotDescriptor

scanIndex creates a LogicalRelation (Spark SQL) for the HadoopFsRelation (and the given isStreaming flag).

In the end, scanIndex creates a DataFrame for the LogicalRelation.


scanIndex is used when:

CDF-Aware Read Schema (Adding CDF Columns)

cdcReadSchema(
  deltaSchema: StructType): StructType

cdcReadSchema makes the given schema (StructType) of a delta table CDF-aware by appending the following CDF metadata fields:

Column Name Data Type
_change_type StringType
_commit_version LongType
_commit_timestamp TimestampType

cdcReadSchema is used when:

CDC-Aware Table Scan (CDC Read)

isCDCRead(
  options: CaseInsensitiveStringMap): Boolean

isCDCRead is true when one of the following options is specified (in the given options) with true value (case-insensitive):

  1. readChangeFeed
  2. (legacy) readChangeData

Otherwise, isCDCRead is false.


isCDCRead is used when:

isCDCEnabledOnTable

isCDCEnabledOnTable(
  metadata: Metadata,
  spark: SparkSession): Boolean

isCDCEnabledOnTable checks if the given metadata requires the Change Data Feed feature to be enabled (based on delta.enableChangeDataFeed table property).


isCDCEnabledOnTable is used when:

Logging

CDCReaderImpl is an abstract class and logging is configured using the logger of the implementations.