MergeOutputGeneration¶
MergeOutputGeneration is an extension of the MergeIntoCommandBase abstraction for merge output generators with logic to transform the merge clauses into expressions that can be evaluated to obtain the (possibly optimized) output of the MERGE INTO command.
Implementations¶
Appending Precomputed Clause Conditions to Source DataFrame¶
generatePrecomputedConditionsAndDF(
sourceDF: DataFrame,
clauses: Seq[DeltaMergeIntoClause]): (DataFrame, Seq[DeltaMergeIntoClause])
generatePrecomputedConditionsAndDF rewrites conditional clauses of all the given DeltaMergeIntoClauses.
rewriteCondition
rewriteCondition populates an internal preComputedClauseConditions registry of pairs of a generated column name (e.g., _update_condition_0_) and a rewritten condition for every conditional clause (i.e., DeltaMergeIntoClauses with a condition).
generatePrecomputedConditionsAndDF adds the generated columns (of the conditional clauses) to the given sourceDF (to precompute clause conditions).
In the end, generatePrecomputedConditionsAndDF returns a pair of the following:
- The given
sourceDFwith the generated columns - The given
clauseswith rewritten conditions
generatePrecomputedConditionsAndDF is used when:
ClassicMergeExecutoris requested to writeAllChangesInsertOnlyMergeExecutoris requested to writeOnlyInserts (to generateInsertsOnlyOutputDF)
Rewriting Conditional Clause¶
rewriteCondition[T <: DeltaMergeIntoClause](
clause: T): T
rewriteCondition rewrites the condition of the given DeltaMergeIntoClause to use a column name of the following pattern (with the clauseType):
_[clauseType]_condition_[index]_
rewriteCondition adds a pair of the new name and the condition in a local preComputedClauseConditions registry (of the owning generatePrecomputedConditionsAndDF).
generateWriteAllChangesOutputCols¶
generateWriteAllChangesOutputCols(
targetOutputCols: Seq[Expression],
outputColNames: Seq[String],
noopCopyExprs: Seq[Expression],
clausesWithPrecompConditions: Seq[DeltaMergeIntoClause],
cdcEnabled: Boolean,
shouldCountDeletedRows: Boolean = true): IndexedSeq[Column]
generateWriteAllChangesOutputCols generates CaseWhen expressions for every column in the given outputColNames.
generateWriteAllChangesOutputCols generates expressions to use in CaseWhens using generateAllActionExprs followed by generateClauseOutputExprs.
generateWriteAllChangesOutputCols generateAllActionExprs for the following DeltaMergeIntoClauses separately (among the given clausesWithPrecompConditions):
- DeltaMergeIntoMatchedClauses
- DeltaMergeIntoNotMatchedClauses
- DeltaMergeIntoNotMatchedBySourceClauses
generateWriteAllChangesOutputCols generateAllActionExprs for DeltaMergeIntoMatchedClauses only (among the given clausesWithPrecompConditions) followed by generateClauseOutputExprs (matchedExprs).
generateWriteAllChangesOutputCols generateAllActionExprs for DeltaMergeIntoNotMatchedClauses only (among the given clausesWithPrecompConditions) followed by generateClauseOutputExprs (notMatchedExprs).
generateWriteAllChangesOutputCols generateAllActionExprs for DeltaMergeIntoNotMatchedBySourceClauses only (among the given clausesWithPrecompConditions) followed by generateClauseOutputExprs (notMatchedBySourceExprs).
generateWriteAllChangesOutputCols creates the following two expressions:
ifSourceRowNullthat istruewhen _source_row_present_ isnullifTargetRowNullthat istruewhen _target_row_present_ isnull
generateWriteAllChangesOutputCols creates a CaseWhen expression for every column in the given outputColNames as follows:
ifSourceRowNullexpression istrue, usenotMatchedBySourceExprsifTargetRowNullexpression istrue, usenotMatchedExprs- Otherwise, use
matchedExprs
In the end, generateWriteAllChangesOutputCols prints out the following DEBUG message to the logs:
writeAllChanges: join output expressions
[outputCols1]
[outputCols2]
...
generateWriteAllChangesOutputCols is used when:
ClassicMergeExecutoris requested to write out merge changes
generateAllActionExprs¶
generateAllActionExprs(
targetOutputCols: Seq[Expression],
clausesWithPrecompConditions: Seq[DeltaMergeIntoClause],
cdcEnabled: Boolean,
shouldCountDeletedRows: Boolean): Seq[ProcessedClause]
cdcEnabled is enableChangeDataFeed Table Property
The given cdcEnabled flag is controled by enableChangeDataFeed table property.
shouldCountDeletedRows is Always Enabled (true)
The given shouldCountDeletedRows flag is always true (comes from the shouldCountDeletedRows flag of generateWriteAllChangesOutputCols and defaults to true).
generateAllActionExprs converts (translates) the given DeltaMergeIntoClauses (clausesWithPrecompConditions) into ProcessedClauses.
ProcessedClause is a pair of the condition expression and actions (based on the type of DeltaMergeIntoClause) as follows:
| DeltaMergeIntoClause | Actions |
|---|---|
| WHEN MATCHED THEN UPDATE |
|
| WHEN NOT MATCHED BY SOURCE THEN UPDATE |
|
| WHEN MATCHED DELETE |
|
| WHEN NOT MATCHED BY SOURCE THEN DELETE |
|
| WHEN NOT MATCHED THEN INSERT |
|
generateClauseOutputExprs¶
generateClauseOutputExprs(
numOutputCols: Int,
clauses: Seq[ProcessedClause],
noopExprs: Seq[Expression]): Seq[Expression]
generateClauseOutputExprs considers the following cases based on the given clauses:
| ProcessedClauses | Expressions Returned |
|---|---|
No clauses | The given noopExprs |
| An unconditional clause being the first | The actions of the unconditional clause |
| One clause | If expressions for every action of this clause |
| Many clauses | CaseWhen expressions |
In essence, generateClauseOutputExprs converts (translates) the given clauses into Catalyst Expressions (Spark SQL).
When there is nothing to update or delete (there are empty clauses given), generateClauseOutputExprs does nothing and returns the given noopExprs expressions.
Otherwise, generateClauseOutputExprs checks how many clauses are there (and some other conditions) to generate the following expressions:
- The
actionsexpressions of the only single clause (inclauses) when with nocondition - For a single clause (in
clauses), as manyIfexpressions as there areactions - Many
CaseWhenexpressions
Note
It is assumed that when the first clause (in clauses) is unconditional (no condition), it is the only clause.
In the end, generateClauseOutputExprs prints out the following DEBUG message to the logs:
writeAllChanges: expressions
[clauseExprs]
generateCdcAndOutputRows¶
generateCdcAndOutputRows(
sourceDf: DataFrame,
outputCols: Seq[Column],
outputColNames: Seq[String],
noopCopyExprs: Seq[Expression],
deduplicateDeletes: DeduplicateCDFDeletes): DataFrame
generateCdcAndOutputRows is used by ClassicMergeExecutor to generate preOutputDF dataframe that is written out when requested to write out merge changes with Change Data Feed enabled. The dataframe to write out is as follows:
- Only _row_dropped_ rows with
falsevalue - No _row_dropped_ column (it is dropped from the output)
Very Position-Sensitive
generateCdcAndOutputRows makes hard assumptions on which columns are on given positions (and so there are a lot of magic numbers floating around).
noopCopyExprs¶
noopCopyExprs is a collection of the following expressions:
- Target output expressions (i.e., the target output expressions followed by any new expressions due to schema evolution)
- An expression to increment numTargetRowsCopied metric
CDC_TYPE_NOT_CDCliteral (withnullvalue)
Hence, noopCopyExprs.dropRight(2) gives the target output expressions (i.e., the target output expressions followed by any new expressions due to schema evolution)
outputCols¶
outputCols is an indexed collection that is generated using generateWriteAllChangesOutputCols. The names of outputCols can be displayed in the logs at DEBUG level.
outputCols.dropRight(1)outputCols(outputCols.length - 2)
outputColNames¶
outputColNames.dropRight(1)
generateCdcAndOutputRows drops the last column from the given outputCols and adds _change_type column with a special sentinel value (null).
FIXME What's at the last position?
FIXME What's at the second last position?
generateCdcAndOutputRows creates a new _row_dropped_ column that negates the values in the column that is just before _change_type column (the second from the end of the given outputCols) and becomes the new second from the end (replaces the former second from the end).
generateCdcAndOutputRows drops two last columns in the given noopCopyExprs with the following new columns (in that order):
- A
falseliteral - A literal with update_preimage value
generateCdcAndOutputRows makes the column names to be aliases from the given outputColNames (updatePreimageCdcOutput).
generateCdcAndOutputRows creates a new column (cdcArray) with a CaseWhen expression based on the last column of the given outputCols (cdcTypeCol). Every CaseWhen case creates an array for the value of the last column (in the given outputCols):
generateCdcAndOutputRows creates a new column (cdcToMainDataArray) with an If expression based on the value of packedCdc._change_type column. If the value is one of the following literals, the If expression gives an array with packedCdc column and something extra:
Otherwise, the If expression gives an array with packedCdc column alone.
In the end, generateCdcAndOutputRows deduplicateCDFDeletes if deduplicateDeletes is enabled. Otherwise, generateCdcAndOutputRows packAndExplodeCDCOutput.
generateCdcAndOutputRows is used when:
ClassicMergeExecutoris requested to write out merge changes (with Change Data Feed enabled)
packAndExplodeCDCOutput¶
packAndExplodeCDCOutput(
sourceDf: DataFrame,
cdcArray: Column,
cdcToMainDataArray: Column,
outputColNames: Seq[String],
dedupColumns: Seq[Column]): DataFrame
In essence, packAndExplodeCDCOutput explodes projectedCDC and cdcToMainDataArray columns, and extracts the given outputColNames columns from the packedData struct.
packAndExplodeCDCOutput does the following with the given sourceDf dataframe (in the order):
- Selects the given
cdcArraycolumn underprojectedCDCalias and all the givendedupColumnscolumns - Explodes the
projectedCDCcolumn (which is the givencdcArraycolumn) underpackedCdcalias alongside all the givendedupColumnscolumns - Explodes the given
cdcToMainDataArraycolumn underpackedDataalias alongside all the givendedupColumnscolumns - Selects the given
outputColNamescolumns from thepackedDatastruct (flattens them usingpackedData.[name]selector) alongside all the givendedupColumnscolumns
deduplicateCDFDeletes¶
deduplicateCDFDeletes(
deduplicateDeletes: DeduplicateCDFDeletes,
df: DataFrame,
cdcArray: Column,
cdcToMainDataArray: Column,
outputColNames: Seq[String]): DataFrame
WHEN NOT MATCHED THEN INSERT Sensitivity
deduplicateCDFDeletes is sensitive (behaves differently) to merges with WHEN NOT MATCHED THEN INSERT clauses (based on the given DeduplicateCDFDeletes).
deduplicateCDFDeletes finds out the deduplication columns (dedupColumns) that include the following:
_target_row_index__source_row_indexonly when this merge includes WHEN NOT MATCHED THEN INSERT clauses
deduplicateCDFDeletes packAndExplodeCDCOutput (and creates a new cdcDf dataframe).
With WHEN NOT MATCHED THEN INSERT clauses, deduplicateCDFDeletes overwrites _target_row_index_ column (in the cdcDf dataframe) to be the value of _source_row_index column for rows with nulls.
deduplicateCDFDeletes deduplicates rows based on _target_row_index_ and _change_type columns.
In the end, deduplicateCDFDeletes drops _target_row_index_ and _source_row_index columns.
Logging¶
MergeOutputGeneration is an abstract class and logging is configured using the logger of the implementations (that boils down to MergeIntoCommand).