CDCReader¶
CDCReader
utility is the key class for CDF and CDC in DeltaLake (per this comment).
getCDCRelation¶
getCDCRelation(
spark: SparkSession,
deltaLog: DeltaLog,
snapshotToUse: Snapshot,
partitionFilters: Seq[Expression],
conf: SQLConf,
options: CaseInsensitiveStringMap): BaseRelation
Note
partitionFilters
argument is not used.
getCDCRelation
getVersionForCDC (with the startingVersion and startingTimestamp for the version and timestamp keys, respectively).
getCDCRelation
...FIXME
getCDCRelation
is used when:
DeltaLog
is requested to create a relation
Resolving Version¶
getVersionForCDC(
spark: SparkSession,
deltaLog: DeltaLog,
conf: SQLConf,
options: CaseInsensitiveStringMap,
versionKey: String,
timestampKey: String): Option[Long]
getVersionForCDC
uses the given options
map to get the value of the given versionKey
key, if available.
When versionKey
and timestampKey
are specified
versionKey
and timestampKey
are specified in the given options
argument that is passed down through getCDCRelation unmodified when DeltaLog
is requested to create a relation with non-empty cdcOptions
.
Otherwise, getVersionForCDC
uses the given options
map to get the value of the given timestampKey
key, if available. getVersionForCDC
...FIXME
If neither the given versionKey
nor the timestampKey
key is available in the options
map, getVersionForCDC
returns None
(undefined value).
_change_data Directory¶
CDCReader
defines _change_data
as the name of the directory (under the data directory) where data changes of a delta table are written out (using DelayedCommitProtocol). This directory may contain partition directories.
Used when:
DelayedCommitProtocol
is requested for the newTaskTempFile
CDF Virtual Columns¶
CDC_COLUMNS_IN_DATA: Seq[String]
CDCReader
defines a CDC_COLUMNS_IN_DATA
collection with __is_cdc and _change_type CDF-specific column names.
__is_cdc Partition Column¶
CDCReader
defines __is_cdc
column name to partition on with Change Data Feed enabled.
__is_cdc
column is added when TransactionalWrite
is requested to performCDCPartition with CDF enabled on a delta table (and _change_type among the columns).
If added, __is_cdc
column becomes the first partitioning column. It is then "consumed" by DelayedCommitProtocol (to write changes to cdc-
-prefixed files, not part-
).
__is_cdc
is one of the CDF Virtual Columns.
Used when:
DelayedCommitProtocol
is requested to getFileName and buildActionFromAddedFile
_change_type Column¶
CDCReader
defines _change_type
column name for the column that represents a change type.
_change_type
is one of the CDF Virtual Columns and among the columns in the CDF-aware read schema.
CDC_TYPE_COLUMN_NAME
is used when:
DeleteCommand
is requested to performDelete (and then rewriteFiles)MergeIntoCommand
is requested to writeAllChanges (to matchedClauseOutput and notMatchedClauseOutput)UpdateCommand
is requested to withUpdatedColumnsWriteIntoDelta
is requested to writeCdcAddFileIndex
is requested to matchingFilesTahoeRemoveFileIndex
is requested to matchingFilesTransactionalWrite
is requested to performCDCPartitionSchemaUtils
utility is used to normalizeColumnNames
CDC_TYPE_NOT_CDC Literal¶
CDC_TYPE_NOT_CDC: Literal
CDCReader
creates CDC_TYPE_NOT_CDC
value to be a Literal
expression with null
value (of StringType
type).
CDC_TYPE_NOT_CDC
is used as a special sentinel value for rows that are part of the main table rather than change data.
CDC_TYPE_NOT_CDC
is used by DML commands when executed with Change Data Feed enabled:
All but DeleteCommand
commands use CDC_TYPE_NOT_CDC
with _change_type as follows:
Column(CDC_TYPE_NOT_CDC).as("_change_type")
DeleteCommand
uses CDC_TYPE_NOT_CDC
as follows:
.withColumn(
"_change_type",
Column(If(filterCondition, CDC_TYPE_NOT_CDC, Literal("delete")))
)
CDC_TYPE_NOT_CDC
is used when (with Change Data Feed enabled):
DeleteCommand
is requested to rewriteFilesMergeIntoCommand
is requested to run a merge (for a non-insert-only merge or with merge.optimizeInsertOnlyMerge.enabled disabled that usesClassicMergeExecutor
to write out merge changes with generateWriteAllChangesOutputCols and generateCdcAndOutputRows)UpdateCommand
is requested to withUpdatedColumnsWriteIntoDelta
is requested to write
CDC-Aware Table Scan (CDC Read)¶
isCDCRead(
options: CaseInsensitiveStringMap): Boolean
isCDCRead
is true
when one of the following options is specified (in the given options
):
- readChangeFeed with
true
value - (legacy) readChangeData with
true
value
Otherwise, isCDCRead
is false
.
isCDCRead
is used when:
DeltaRelation
utility is used to fromV2RelationDeltaTableV2
is requested to withOptionsDeltaDataSource
is requested for the streaming source schema and to create a BaseRelation
CDF-Aware Read Schema (Adding CDF Columns)¶
cdcReadSchema(
deltaSchema: StructType): StructType
cdcReadSchema
adds the CDF columns to the given deltaSchema
.
Name | Type |
---|---|
_change_type | StringType |
_commit_version | LongType |
_commit_timestamp | TimestampType |
cdcReadSchema
is used when:
CDCReader
utility is used to getCDCRelation and scanIndexDeltaRelation
utility is used to fromV2RelationOptimisticTransactionImpl
is requested to performCdcMetadataCheckCdcAddFileIndex
is requested for the partitionSchemaTahoeRemoveFileIndex
is requested for the partitionSchemaDeltaDataSource
is requested for the sourceSchemaDeltaSourceBase
is requested for the schemaDeltaSourceCDCSupport
is requested to filterCDCActions
changesToDF¶
changesToDF(
deltaLog: DeltaLog,
start: Long,
end: Long,
changes: Iterator[(Long, Seq[Action])],
spark: SparkSession,
isStreaming: Boolean = false): CDCVersionDiffInfo
changesToDF
...FIXME
changesToDF
is used when:
CDCReader
is requested to changesToBatchDFDeltaSourceCDCSupport
is requested to getCDCFileChangesAndCreateDataFrame
DeltaUnsupportedOperationException¶
changesToDF
makes sure that the DeltaColumnMappingMode is NoMapping or throws a DeltaUnsupportedOperationException
:
Change data feed (CDF) reads are currently not supported on tables with column mapping enabled.
scanIndex¶
scanIndex(
spark: SparkSession,
index: TahoeFileIndex,
metadata: Metadata,
isStreaming: Boolean = false): DataFrame
scanIndex
creates a LogicalRelation
(Spark SQL) with a HadoopFsRelation
(Spark SQL) (with the given TahoeFileIndex, cdcReadSchema, no bucketing, DeltaParquetFileFormat).
In the end, scanIndex
wraps it up as a DataFrame
.
isCDCEnabledOnTable¶
isCDCEnabledOnTable(
metadata: Metadata): Boolean
isCDCEnabledOnTable
is the value of the delta.enableChangeDataFeed table property.
isCDCEnabledOnTable
is used when:
OptimisticTransactionImpl
is requested to performCdcMetadataCheck and performCdcColumnMappingCheckWriteIntoDelta
is requested to writeCDCReader
is requested to changesToDFTransactionalWrite
is requested to performCDCPartition
insert Change Type¶
CDCReader
defines insert
value for the value of the _change_type column in the following:
- notMatchedClauseOutput with cdcEnabled (when writeAllChanges)
WriteIntoDelta
is requested to write data out (with isCDCEnabledOnTable)CdcAddFileIndex
is requested to matchingFiles