Skip to content

MergeOutputGeneration

MergeOutputGeneration is an extension of the MergeIntoCommandBase abstraction for merge output generators with logic to transform the merge clauses into expressions that can be evaluated to obtain the (possibly optimized) output of the MERGE INTO command.

Implementations

Appending Precomputed Clause Conditions to Source DataFrame

generatePrecomputedConditionsAndDF(
  sourceDF: DataFrame,
  clauses: Seq[DeltaMergeIntoClause]): (DataFrame, Seq[DeltaMergeIntoClause])

generatePrecomputedConditionsAndDF rewrites conditional clauses of all the given DeltaMergeIntoClauses.

rewriteCondition

rewriteCondition populates an internal preComputedClauseConditions registry of pairs of a generated column name (e.g., _update_condition_0_) and a rewritten condition for every conditional clause (i.e., DeltaMergeIntoClauses with a condition).

generatePrecomputedConditionsAndDF adds the generated columns (of the conditional clauses) to the given sourceDF (to precompute clause conditions).

In the end, generatePrecomputedConditionsAndDF returns a pair of the following:

  1. The given sourceDF with the generated columns
  2. The given clauses with rewritten conditions

generatePrecomputedConditionsAndDF is used when:

Rewriting Conditional Clause

rewriteCondition[T <: DeltaMergeIntoClause](
  clause: T): T

rewriteCondition rewrites the condition of the given DeltaMergeIntoClause to use a column name of the following pattern (with the clauseType):

_[clauseType]_condition_[index]_

rewriteCondition adds a pair of the new name and the condition in a local preComputedClauseConditions registry (of the owning generatePrecomputedConditionsAndDF).

generateWriteAllChangesOutputCols

generateWriteAllChangesOutputCols(
  targetOutputCols: Seq[Expression],
  outputColNames: Seq[String],
  noopCopyExprs: Seq[Expression],
  clausesWithPrecompConditions: Seq[DeltaMergeIntoClause],
  cdcEnabled: Boolean,
  shouldCountDeletedRows: Boolean = true): IndexedSeq[Column]

generateWriteAllChangesOutputCols generates CaseWhen expressions for every column in the given outputColNames.

generateWriteAllChangesOutputCols generates expressions to use in CaseWhens using generateAllActionExprs followed by generateClauseOutputExprs.

generateWriteAllChangesOutputCols generateAllActionExprs for the following DeltaMergeIntoClauses separately (among the given clausesWithPrecompConditions):


generateWriteAllChangesOutputCols generateAllActionExprs for DeltaMergeIntoMatchedClauses only (among the given clausesWithPrecompConditions) followed by generateClauseOutputExprs (matchedExprs).

generateWriteAllChangesOutputCols generateAllActionExprs for DeltaMergeIntoNotMatchedClauses only (among the given clausesWithPrecompConditions) followed by generateClauseOutputExprs (notMatchedExprs).

generateWriteAllChangesOutputCols generateAllActionExprs for DeltaMergeIntoNotMatchedBySourceClauses only (among the given clausesWithPrecompConditions) followed by generateClauseOutputExprs (notMatchedBySourceExprs).

generateWriteAllChangesOutputCols creates the following two expressions:

  1. ifSourceRowNull that is true when _source_row_present_ is null
  2. ifTargetRowNull that is true when _target_row_present_ is null

generateWriteAllChangesOutputCols creates a CaseWhen expression for every column in the given outputColNames as follows:

  • ifSourceRowNull expression is true, use notMatchedBySourceExprs
  • ifTargetRowNull expression is true, use notMatchedExprs
  • Otherwise, use matchedExprs

In the end, generateWriteAllChangesOutputCols prints out the following DEBUG message to the logs:

writeAllChanges: join output expressions
  [outputCols1]
  [outputCols2]
  ...

generateWriteAllChangesOutputCols is used when:

generateAllActionExprs

generateAllActionExprs(
  targetOutputCols: Seq[Expression],
  clausesWithPrecompConditions: Seq[DeltaMergeIntoClause],
  cdcEnabled: Boolean,
  shouldCountDeletedRows: Boolean): Seq[ProcessedClause]
cdcEnabled is enableChangeDataFeed Table Property

The given cdcEnabled flag is controled by enableChangeDataFeed table property.

shouldCountDeletedRows is Always Enabled (true)

The given shouldCountDeletedRows flag is always true (comes from the shouldCountDeletedRows flag of generateWriteAllChangesOutputCols and defaults to true).

generateAllActionExprs converts (translates) the given DeltaMergeIntoClauses (clausesWithPrecompConditions) into ProcessedClauses.

ProcessedClause is a pair of the condition expression and actions (based on the type of DeltaMergeIntoClause) as follows:

DeltaMergeIntoClause Actions
WHEN MATCHED THEN UPDATE
  1. All DeltaMergeActions
  2. Expression to increment metrics:Returnsfalse literal
  3. update_postimage literal (when the given cdcEnabled flag enabled) or none
WHEN NOT MATCHED BY SOURCE THEN UPDATE
  1. All DeltaMergeActions
  2. Expression to increment metrics:Returnsfalse literal
  3. update_postimage literal (when the given cdcEnabled flag enabled) or none
WHEN MATCHED DELETE
  1. All DeltaMergeActions
  2. Expression to increment metrics (when shouldCountDeletedRows enabled) or true literal:Returnstrue literal
  3. delete literal (when the given cdcEnabled flag enabled) or none
WHEN NOT MATCHED BY SOURCE THEN DELETE
  1. All DeltaMergeActions
  2. Expression to increment metrics (when shouldCountDeletedRows enabled) or true literal:Returns true literal
  3. delete literal (when the given cdcEnabled flag enabled) or none
WHEN NOT MATCHED THEN INSERT
  1. All DeltaMergeActions
  2. Expression to increment metrics:Returns false literal
  3. insert literal (when the given cdcEnabled flag enabled) or none

generateClauseOutputExprs

generateClauseOutputExprs(
  numOutputCols: Int,
  clauses: Seq[ProcessedClause],
  noopExprs: Seq[Expression]): Seq[Expression]

generateClauseOutputExprs considers the following cases based on the given clauses:

ProcessedClauses Expressions Returned
No clauses The given noopExprs
An unconditional clause being the first The actions of the unconditional clause
One clause If expressions for every action of this clause
Many clauses CaseWhen expressions

In essence, generateClauseOutputExprs converts (translates) the given clauses into Catalyst Expressions (Spark SQL).


When there is nothing to update or delete (there are empty clauses given), generateClauseOutputExprs does nothing and returns the given noopExprs expressions.

Otherwise, generateClauseOutputExprs checks how many clauses are there (and some other conditions) to generate the following expressions:

  • The actions expressions of the only single clause (in clauses) when with no condition
  • For a single clause (in clauses), as many If expressions as there are actions
  • Many CaseWhen expressions

Note

It is assumed that when the first clause (in clauses) is unconditional (no condition), it is the only clause.

In the end, generateClauseOutputExprs prints out the following DEBUG message to the logs:

writeAllChanges: expressions
  [clauseExprs]

generateCdcAndOutputRows

generateCdcAndOutputRows(
  sourceDf: DataFrame,
  outputCols: Seq[Column],
  outputColNames: Seq[String],
  noopCopyExprs: Seq[Expression],
  deduplicateDeletes: DeduplicateCDFDeletes): DataFrame

generateCdcAndOutputRows is used by ClassicMergeExecutor to generate preOutputDF dataframe that is written out when requested to write out merge changes with Change Data Feed enabled. The dataframe to write out is as follows:

Very Position-Sensitive

generateCdcAndOutputRows makes hard assumptions on which columns are on given positions (and so there are a lot of magic numbers floating around).

noopCopyExprs

noopCopyExprs is a collection of the following expressions:

Hence, noopCopyExprs.dropRight(2) gives the target output expressions (i.e., the target output expressions followed by any new expressions due to schema evolution)

outputCols

outputCols is an indexed collection that is generated using generateWriteAllChangesOutputCols. The names of outputCols can be displayed in the logs at DEBUG level.

  • outputCols.dropRight(1)
  • outputCols(outputCols.length - 2)

outputColNames

  • outputColNames.dropRight(1)

generateCdcAndOutputRows drops the last column from the given outputCols and adds _change_type column with a special sentinel value (null).

FIXME What's at the last position?

FIXME What's at the second last position?

generateCdcAndOutputRows creates a new _row_dropped_ column that negates the values in the column that is just before _change_type column (the second from the end of the given outputCols) and becomes the new second from the end (replaces the former second from the end).

generateCdcAndOutputRows drops two last columns in the given noopCopyExprs with the following new columns (in that order):

  1. A false literal
  2. A literal with update_preimage value

generateCdcAndOutputRows makes the column names to be aliases from the given outputColNames (updatePreimageCdcOutput).

generateCdcAndOutputRows creates a new column (cdcArray) with a CaseWhen expression based on the last column of the given outputCols (cdcTypeCol). Every CaseWhen case creates an array for the value of the last column (in the given outputCols):

generateCdcAndOutputRows creates a new column (cdcToMainDataArray) with an If expression based on the value of packedCdc._change_type column. If the value is one of the following literals, the If expression gives an array with packedCdc column and something extra:

Otherwise, the If expression gives an array with packedCdc column alone.

In the end, generateCdcAndOutputRows deduplicateCDFDeletes if deduplicateDeletes is enabled. Otherwise, generateCdcAndOutputRows packAndExplodeCDCOutput.


generateCdcAndOutputRows is used when:

packAndExplodeCDCOutput

packAndExplodeCDCOutput(
  sourceDf: DataFrame,
  cdcArray: Column,
  cdcToMainDataArray: Column,
  outputColNames: Seq[String],
  dedupColumns: Seq[Column]): DataFrame

In essence, packAndExplodeCDCOutput explodes projectedCDC and cdcToMainDataArray columns, and extracts the given outputColNames columns from the packedData struct.


packAndExplodeCDCOutput does the following with the given sourceDf dataframe (in the order):

  1. Selects the given cdcArray column under projectedCDC alias and all the given dedupColumns columns
  2. Explodes the projectedCDC column (which is the given cdcArray column) under packedCdc alias alongside all the given dedupColumns columns
  3. Explodes the given cdcToMainDataArray column under packedData alias alongside all the given dedupColumns columns
  4. Selects the given outputColNames columns from the packedData struct (flattens them using packedData.[name] selector) alongside all the given dedupColumns columns

deduplicateCDFDeletes

deduplicateCDFDeletes(
  deduplicateDeletes: DeduplicateCDFDeletes,
  df: DataFrame,
  cdcArray: Column,
  cdcToMainDataArray: Column,
  outputColNames: Seq[String]): DataFrame

WHEN NOT MATCHED THEN INSERT Sensitivity

deduplicateCDFDeletes is sensitive (behaves differently) to merges with WHEN NOT MATCHED THEN INSERT clauses (based on the given DeduplicateCDFDeletes).

deduplicateCDFDeletes finds out the deduplication columns (dedupColumns) that include the following:

deduplicateCDFDeletes packAndExplodeCDCOutput (and creates a new cdcDf dataframe).

With WHEN NOT MATCHED THEN INSERT clauses, deduplicateCDFDeletes overwrites _target_row_index_ column (in the cdcDf dataframe) to be the value of _source_row_index column for rows with nulls.

deduplicateCDFDeletes deduplicates rows based on _target_row_index_ and _change_type columns.

In the end, deduplicateCDFDeletes drops _target_row_index_ and _source_row_index columns.

Logging

MergeOutputGeneration is an abstract class and logging is configured using the logger of the implementations (that boils down to MergeIntoCommand).