MergeOutputGeneration¶
MergeOutputGeneration
is an extension of the MergeIntoCommandBase abstraction for merge output generators with logic to transform the merge clauses into expressions that can be evaluated to obtain the (possibly optimized) output of the MERGE INTO command.
Implementations¶
Appending Precomputed Clause Conditions to Source DataFrame¶
generatePrecomputedConditionsAndDF(
sourceDF: DataFrame,
clauses: Seq[DeltaMergeIntoClause]): (DataFrame, Seq[DeltaMergeIntoClause])
generatePrecomputedConditionsAndDF
rewrites conditional clauses of all the given DeltaMergeIntoClauses.
rewriteCondition
rewriteCondition populates an internal preComputedClauseConditions
registry of pairs of a generated column name (e.g., _update_condition_0_
) and a rewritten condition for every conditional clause (i.e., DeltaMergeIntoClauses with a condition).
generatePrecomputedConditionsAndDF
adds the generated columns (of the conditional clauses) to the given sourceDF
(to precompute clause conditions).
In the end, generatePrecomputedConditionsAndDF
returns a pair of the following:
- The given
sourceDF
with the generated columns - The given
clauses
with rewritten conditions
generatePrecomputedConditionsAndDF
is used when:
ClassicMergeExecutor
is requested to writeAllChangesInsertOnlyMergeExecutor
is requested to writeOnlyInserts (to generateInsertsOnlyOutputDF)
Rewriting Conditional Clause¶
rewriteCondition[T <: DeltaMergeIntoClause](
clause: T): T
rewriteCondition
rewrites the condition of the given DeltaMergeIntoClause to use a column name of the following pattern (with the clauseType):
_[clauseType]_condition_[index]_
rewriteCondition
adds a pair of the new name and the condition in a local preComputedClauseConditions
registry (of the owning generatePrecomputedConditionsAndDF).
generateWriteAllChangesOutputCols¶
generateWriteAllChangesOutputCols(
targetOutputCols: Seq[Expression],
outputColNames: Seq[String],
noopCopyExprs: Seq[Expression],
clausesWithPrecompConditions: Seq[DeltaMergeIntoClause],
cdcEnabled: Boolean,
shouldCountDeletedRows: Boolean = true): IndexedSeq[Column]
generateWriteAllChangesOutputCols
generates CaseWhen
expressions for every column in the given outputColNames
.
generateWriteAllChangesOutputCols
generates expressions to use in CaseWhen
s using generateAllActionExprs followed by generateClauseOutputExprs.
generateWriteAllChangesOutputCols
generateAllActionExprs for the following DeltaMergeIntoClauses separately (among the given clausesWithPrecompConditions
):
- DeltaMergeIntoMatchedClauses
- DeltaMergeIntoNotMatchedClauses
- DeltaMergeIntoNotMatchedBySourceClauses
generateWriteAllChangesOutputCols
generateAllActionExprs for DeltaMergeIntoMatchedClauses only (among the given clausesWithPrecompConditions
) followed by generateClauseOutputExprs (matchedExprs
).
generateWriteAllChangesOutputCols
generateAllActionExprs for DeltaMergeIntoNotMatchedClauses only (among the given clausesWithPrecompConditions
) followed by generateClauseOutputExprs (notMatchedExprs
).
generateWriteAllChangesOutputCols
generateAllActionExprs for DeltaMergeIntoNotMatchedBySourceClauses only (among the given clausesWithPrecompConditions
) followed by generateClauseOutputExprs (notMatchedBySourceExprs
).
generateWriteAllChangesOutputCols
creates the following two expressions:
ifSourceRowNull
that istrue
when _source_row_present_ isnull
ifTargetRowNull
that istrue
when _target_row_present_ isnull
generateWriteAllChangesOutputCols
creates a CaseWhen
expression for every column in the given outputColNames
as follows:
ifSourceRowNull
expression istrue
, usenotMatchedBySourceExprs
ifTargetRowNull
expression istrue
, usenotMatchedExprs
- Otherwise, use
matchedExprs
In the end, generateWriteAllChangesOutputCols
prints out the following DEBUG message to the logs:
writeAllChanges: join output expressions
[outputCols1]
[outputCols2]
...
generateWriteAllChangesOutputCols
is used when:
ClassicMergeExecutor
is requested to write out merge changes
generateAllActionExprs¶
generateAllActionExprs(
targetOutputCols: Seq[Expression],
clausesWithPrecompConditions: Seq[DeltaMergeIntoClause],
cdcEnabled: Boolean,
shouldCountDeletedRows: Boolean): Seq[ProcessedClause]
cdcEnabled
is enableChangeDataFeed
Table Property
The given cdcEnabled
flag is controled by enableChangeDataFeed table property.
shouldCountDeletedRows
is Always Enabled (true
)
The given shouldCountDeletedRows
flag is always true
(comes from the shouldCountDeletedRows
flag of generateWriteAllChangesOutputCols and defaults to true
).
generateAllActionExprs
converts (translates) the given DeltaMergeIntoClauses (clausesWithPrecompConditions
) into ProcessedClause
s.
ProcessedClause
is a pair of the condition expression and actions (based on the type of DeltaMergeIntoClause
) as follows:
DeltaMergeIntoClause | Actions |
---|---|
WHEN MATCHED THEN UPDATE |
|
WHEN NOT MATCHED BY SOURCE THEN UPDATE |
|
WHEN MATCHED DELETE |
|
WHEN NOT MATCHED BY SOURCE THEN DELETE |
|
WHEN NOT MATCHED THEN INSERT |
|
generateClauseOutputExprs¶
generateClauseOutputExprs(
numOutputCols: Int,
clauses: Seq[ProcessedClause],
noopExprs: Seq[Expression]): Seq[Expression]
generateClauseOutputExprs
considers the following cases based on the given clauses
:
ProcessedClauses | Expressions Returned |
---|---|
No clauses | The given noopExprs |
An unconditional clause being the first | The actions of the unconditional clause |
One clause | If expressions for every action of this clause |
Many clauses | CaseWhen expressions |
In essence, generateClauseOutputExprs
converts (translates) the given clauses
into Catalyst Expression
s (Spark SQL).
When there is nothing to update or delete (there are empty clauses
given), generateClauseOutputExprs
does nothing and returns the given noopExprs
expressions.
Otherwise, generateClauseOutputExprs
checks how many clauses
are there (and some other conditions) to generate the following expressions:
- The
actions
expressions of the only single clause (inclauses
) when with nocondition
- For a single clause (in
clauses
), as manyIf
expressions as there areactions
- Many
CaseWhen
expressions
Note
It is assumed that when the first clause (in clauses
) is unconditional (no condition
), it is the only clause.
In the end, generateClauseOutputExprs
prints out the following DEBUG message to the logs:
writeAllChanges: expressions
[clauseExprs]
generateCdcAndOutputRows¶
generateCdcAndOutputRows(
sourceDf: DataFrame,
outputCols: Seq[Column],
outputColNames: Seq[String],
noopCopyExprs: Seq[Expression],
deduplicateDeletes: DeduplicateCDFDeletes): DataFrame
generateCdcAndOutputRows
is used by ClassicMergeExecutor
to generate preOutputDF
dataframe that is written out when requested to write out merge changes with Change Data Feed enabled. The dataframe to write out is as follows:
- Only _row_dropped_ rows with
false
value - No _row_dropped_ column (it is dropped from the output)
Very Position-Sensitive
generateCdcAndOutputRows
makes hard assumptions on which columns are on given positions (and so there are a lot of magic numbers floating around).
noopCopyExprs¶
noopCopyExprs
is a collection of the following expressions:
- Target output expressions (i.e., the target output expressions followed by any new expressions due to schema evolution)
- An expression to increment numTargetRowsCopied metric
CDC_TYPE_NOT_CDC
literal (withnull
value)
Hence, noopCopyExprs.dropRight(2)
gives the target output expressions (i.e., the target output expressions followed by any new expressions due to schema evolution)
outputCols¶
outputCols
is an indexed collection that is generated using generateWriteAllChangesOutputCols. The names of outputCols
can be displayed in the logs at DEBUG level.
outputCols.dropRight(1)
outputCols(outputCols.length - 2)
outputColNames¶
outputColNames.dropRight(1)
generateCdcAndOutputRows
drops the last column from the given outputCols
and adds _change_type
column with a special sentinel value (null
).
FIXME What's at the last position?
FIXME What's at the second last position?
generateCdcAndOutputRows
creates a new _row_dropped_
column that negates the values in the column that is just before _change_type
column (the second from the end of the given outputCols
) and becomes the new second from the end (replaces the former second from the end).
generateCdcAndOutputRows
drops two last columns in the given noopCopyExprs
with the following new columns (in that order):
- A
false
literal - A literal with update_preimage value
generateCdcAndOutputRows
makes the column names to be aliases from the given outputColNames
(updatePreimageCdcOutput
).
generateCdcAndOutputRows
creates a new column (cdcArray
) with a CaseWhen
expression based on the last column of the given outputCols
(cdcTypeCol
). Every CaseWhen
case creates an array
for the value of the last column (in the given outputCols
):
generateCdcAndOutputRows
creates a new column (cdcToMainDataArray
) with an If
expression based on the value of packedCdc._change_type
column. If the value is one of the following literals, the If
expression gives an array with packedCdc
column and something extra:
Otherwise, the If
expression gives an array with packedCdc
column alone.
In the end, generateCdcAndOutputRows
deduplicateCDFDeletes if deduplicateDeletes is enabled. Otherwise, generateCdcAndOutputRows
packAndExplodeCDCOutput.
generateCdcAndOutputRows
is used when:
ClassicMergeExecutor
is requested to write out merge changes (with Change Data Feed enabled)
packAndExplodeCDCOutput¶
packAndExplodeCDCOutput(
sourceDf: DataFrame,
cdcArray: Column,
cdcToMainDataArray: Column,
outputColNames: Seq[String],
dedupColumns: Seq[Column]): DataFrame
In essence, packAndExplodeCDCOutput
explodes projectedCDC
and cdcToMainDataArray
columns, and extracts the given outputColNames
columns from the packedData
struct.
packAndExplodeCDCOutput
does the following with the given sourceDf
dataframe (in the order):
- Selects the given
cdcArray
column underprojectedCDC
alias and all the givendedupColumns
columns - Explodes the
projectedCDC
column (which is the givencdcArray
column) underpackedCdc
alias alongside all the givendedupColumns
columns - Explodes the given
cdcToMainDataArray
column underpackedData
alias alongside all the givendedupColumns
columns - Selects the given
outputColNames
columns from thepackedData
struct (flattens them usingpackedData.[name]
selector) alongside all the givendedupColumns
columns
deduplicateCDFDeletes¶
deduplicateCDFDeletes(
deduplicateDeletes: DeduplicateCDFDeletes,
df: DataFrame,
cdcArray: Column,
cdcToMainDataArray: Column,
outputColNames: Seq[String]): DataFrame
WHEN NOT MATCHED THEN INSERT Sensitivity
deduplicateCDFDeletes
is sensitive (behaves differently) to merges with WHEN NOT MATCHED THEN INSERT clauses (based on the given DeduplicateCDFDeletes).
deduplicateCDFDeletes
finds out the deduplication columns (dedupColumns
) that include the following:
_target_row_index_
_source_row_index
only when this merge includes WHEN NOT MATCHED THEN INSERT clauses
deduplicateCDFDeletes
packAndExplodeCDCOutput (and creates a new cdcDf
dataframe).
With WHEN NOT MATCHED THEN INSERT clauses, deduplicateCDFDeletes
overwrites _target_row_index_
column (in the cdcDf
dataframe) to be the value of _source_row_index
column for rows with null
s.
deduplicateCDFDeletes
deduplicates rows based on _target_row_index_
and _change_type
columns.
In the end, deduplicateCDFDeletes
drops _target_row_index_
and _source_row_index
columns.
Logging¶
MergeOutputGeneration
is an abstract class and logging is configured using the logger of the implementations (that boils down to MergeIntoCommand).