Skip to content

MergeIntoCommandBase

MergeIntoCommandBase is a DeltaCommand and an extension of the LeafRunnableCommand (Spark SQL) abstraction for merge delta commands.

MergeIntoCommandBase is a MergeIntoMaterializeSource.

Contract (Subset)

Merge Condition

condition: Expression

The "join" condition of this merge

See:

Used when:

WHEN MATCHED Clauses

matchedClauses: Seq[DeltaMergeIntoMatchedClause]

DeltaMergeIntoMatchedClauses

See:

Used when:

WHEN NOT MATCHED Clauses

notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause]

DeltaMergeIntoNotMatchedClauses

See:

Used when:

WHEN NOT MATCHED BY SOURCE Clauses

notMatchedBySourceClauses: Seq[DeltaMergeIntoNotMatchedBySourceClause]

DeltaMergeIntoNotMatchedBySourceClauses

See:

Used when:

Running Merge

runMerge(
  spark: SparkSession): Seq[Row]

See:

Used when:

  • MergeIntoCommandBase is requested to run

Source Data

source: LogicalPlan

A LogicalPlan (Spark SQL) of the source data to merge from (internally referred to as source)

See:

Used when:

  • MergeIntoCommandBase is requested to run

Implementations

Performance Metrics

Name web UI
numSourceRows number of source rows
numSourceRowsInSecondScan number of source rows (during repeated scan)
numTargetRowsCopied number of target rows rewritten unmodified
numTargetRowsInserted number of inserted rows
numTargetRowsUpdated number of updated rows
numTargetRowsDeleted number of deleted rows
numTargetFilesBeforeSkipping number of target files before skipping
numTargetFilesAfterSkipping number of target files after skipping
numTargetFilesRemoved number of files removed to target
numTargetFilesAdded number of files added to target
numTargetChangeFilesAdded number of change data capture files generated
numTargetChangeFileBytes total size of change data capture files generated
numTargetBytesBeforeSkipping number of target bytes before skipping
numTargetBytesAfterSkipping number of target bytes after skipping
numTargetBytesRemoved number of target bytes removed
numTargetBytesAdded number of target bytes added
numTargetPartitionsAfterSkipping number of target partitions after skipping
numTargetPartitionsRemovedFrom number of target partitions from which files were removed
numTargetPartitionsAddedTo number of target partitions to which files were added
numTargetRowsMatchedDeleted number of rows deleted by a matched clause
numTargetRowsNotMatchedBySourceDeleted number of rows deleted by a not matched by source clause
numTargetRowsMatchedUpdated number of rows updated by a matched clause
numTargetRowsNotMatchedBySourceUpdated number of rows updated by a not matched by source clause
executionTimeMs time taken to execute the entire operation
scanTimeMs time taken to scan the files for matches
rewriteTimeMs time taken to rewrite the matched files

number of deleted rows

number of inserted rows

number of rows deleted by a matched clause

number of rows deleted by a not matched by source clause

number of rows updated by a matched clause

number of rows updated by a not matched by source clause

number of source rows (during repeated scan)

number of target rows rewritten unmodified

numTargetRowsCopied performance metric (like the other metrics) is turned into a non-deterministic user-defined function (UDF).

numTargetRowsCopied becomes incrNoopCountExpr UDF.

incrNoopCountExpr UDF is resolved on a joined plan and used to create a JoinedRowProcessor for processing partitions of the joined plan Dataset.

number of updated rows

time taken to execute the entire operation

time taken to rewrite the matched files

time taken to scan the files for matches

Building Target (Logical) Plan Spanned Over Fewer Files

buildTargetPlanWithFiles(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction,
  files: Seq[AddFile],
  columnsToDrop: Seq[String]): LogicalPlan
columnsToDrop Argument

columnsToDrop is always empty (Nil) but for ClassicMergeExecutor to findTouchedFiles.

buildTargetPlanWithFiles creates a TahoeBatchFileIndex for the given AddFiles (files) only.

Note

buildTargetPlanWithFiles creates a LogicalPlan of a delta table that is possibly smaller parquet data files (spanning over a smaller number of files) than the "source".

In the end, buildTargetPlanWithFiles buildTargetPlanWithIndex for the TahoeBatchFileIndex and the given columnsToDrop column names.


buildTargetPlanWithFiles is used when:

buildTargetPlanWithIndex

buildTargetPlanWithIndex(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction,
  fileIndex: TahoeFileIndex,
  columnsToDrop: Seq[String]): LogicalPlan

buildTargetPlanWithIndex gets the references to columns in the target dataframe (AttributeReferences) and nulls for new columns that are added to the target table (as part of this OptimisticTransaction).

In the end, buildTargetPlanWithIndex creates a Project logical operator with new columns (with nulls) after the existing ones.

Special Columns

_row_dropped_

_row_dropped_ column name is used when:

CDF-Aware Output Generation

It appears that ClassicMergeExecutor and InsertOnlyMergeExecutor use _row_dropped_ column to include the rows with false value in output dataframes followed by dropping the column immediately.

An "exception" to the behaviour is MergeOutputGeneration for CDF-aware output generation.

Executing Command

RunnableCommand
run(
  spark: SparkSession): Seq[Row]

run is part of the RunnableCommand (Spark SQL) abstraction.

run is a transactional operation that is made up of the following steps:

  1. The metrics are reset
  2. Check if the source table should be materialized
  3. Runs this merge when no materialization or runWithMaterializedSourceLostRetries

Reset Metrics

The following metrics are reset (set to 0):

isInsertOnly

isInsertOnly: Boolean

isInsertOnly is positive for an insert-only merge (i.e. when there are WHEN NOT MATCHED clauses only in this MERGE command).


In other words, isInsertOnly is enabled (true) when all the following hold:

  1. There are neither WHEN MATCHED nor WHEN NOT MATCHED BY SOURCE clauses
  2. There are WHEN NOT MATCHED clauses

isInsertOnly is used when:

Matched-Only Merge

isMatchedOnly: Boolean

isMatchedOnly is enabled (true) when this merge is WHEN MATCHED-only:


isMatchedOnly is used when:

shouldOptimizeMatchedOnlyMerge

shouldOptimizeMatchedOnlyMerge(
  spark: SparkSession): Boolean

shouldOptimizeMatchedOnlyMerge is enabled (true) when the following all hold:


shouldOptimizeMatchedOnlyMerge is used when:

Finding Target-Only Predicates (Among Merge and Clause Conditions)

getTargetOnlyPredicates(
  spark: SparkSession): Seq[Expression]

getTargetOnlyPredicates determines the target-only predicates in the merge condition first (targetOnlyPredicatesOnCondition). getTargetOnlyPredicates splits conjunctive predicates (Ands) in the merge condition and leaves only the ones with the column references to the columns in the target table only.

getTargetOnlyPredicates branches off based on whether this merge is matched-only or not.

For a non-matched-only merge, getTargetOnlyPredicates returns the target-only condition predicates.

Otherwise, getTargetOnlyPredicates does the same (what it has just done with the merge condition) with conditional WHEN MATCHED clauses (the ones with a condition specified). getTargetOnlyPredicates splits conjunctive predicates (Ands) in the conditions of the matchedClauses and leaves only the ones with the column references to the target.

In the end, getTargetOnlyPredicates returns the target-only condition predicates (from the merge condition and all the conditions from the conditional WHEN MATCHED clauses).


getTargetOnlyPredicates is used when:

throwErrorOnMultipleMatches

throwErrorOnMultipleMatches(
  hasMultipleMatches: Boolean,
  spark: SparkSession): Unit
Procedure

throwErrorOnMultipleMatches is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

throwErrorOnMultipleMatches throws a DeltaUnsupportedOperationException when the given hasMultipleMatches is enabled (true) and isOnlyOneUnconditionalDelete is disabled (false).


throwErrorOnMultipleMatches is used when:

isOnlyOneUnconditionalDelete

isOnlyOneUnconditionalDelete: Boolean

isOnlyOneUnconditionalDelete is positive (true) when there is only one when matched clause that is a DELETE with no condition.


In other words, isOnlyOneUnconditionalDelete is true for the following:

Recording Merge Operation

recordMergeOperation[A](
  extraOpType: String = "",
  status: String = null,
  sqlMetricName: String = null)(
  thunk: => A): A

recordMergeOperation creates a operation type (changedOpType) based on the given extraOpType identifier as follows:

delta.dml.merge.[extraOpType]

recordMergeOperation appends the given status to the existing Spark job description (as spark.job.description local property), if any.

In the end, recordMergeOperation executes the given thunk code block:

  1. Records the start time
  2. Sets a human readable description of the current job (Spark Core) as the prefixed status
  3. Adds the time taken to the given sqlMetricName
  4. Restores the job description to the previous one, if any

MergeOutputGeneration extraOpType status sqlMetricName
ClassicMergeExecutor findTouchedFiles MERGE operation - scanning files for matches scanTimeMs
ClassicMergeExecutor writeAllUpdatesAndDeletes or writeAllChanges MERGE operation - Rewriting n files rewriteTimeMs
InsertOnlyMergeExecutor writeInsertsOnlyWhenNoMatchedClauses or writeInsertsOnlyWhenNoMatches MERGE operation - writing new files for only inserts rewriteTimeMs

recordMergeOperation is used when:

Is WHEN NOT MATCHED THEN INSERT Clause Used

includesInserts: Boolean

includesInserts is positive (true) when there are WHEN NOT MATCHED clauses in this merge (with WHEN NOT MATCHED THEN INSERTs possible and hence the name of this method).


includesInserts is used when:

Auto Schema Merging

ImplicitMetadataOperation
canMergeSchema: Boolean

canMergeSchema is part of the ImplicitMetadataOperation abstraction.

canMergeSchema creates a DeltaOptions for the canMergeSchema (based on the SQLConf only).

Options always empty

Delta options cannot be passed to MERGE INTO, so they will always be empty and canMergeSchema relies on the SQLConf only.

Create IncrementMetric Expression

incrementMetricAndReturnBool(
  name: String,
  valueToReturn: Boolean): Expression

incrementMetricAndReturnBool creates a IncrementMetric unary expression with the following:

IncrementMetric Value
Child Expression A Literal with the given valueToReturn (when executed)
SQLMetric The metric by the given name

IncrementMetric

The IncrementMetric presents itself under increment_metric name in query plans.

When executed, IncrementMetric increments the metric (by the given name) and returns the given valueToReturn.

Usage Metric Name valueToReturn
ClassicMergeExecutor to find files to rewrite numSourceRows true
ClassicMergeExecutor to write out merge changes numSourceRowsInSecondScan true
numTargetRowsCopied false
InsertOnlyMergeExecutor to write out inserts numSourceRows or numSourceRowsInSecondScan true
InsertOnlyMergeExecutor to generateInsertsOnlyOutputCols numTargetRowsInserted false
MergeOutputGeneration to generateAllActionExprs numTargetRowsUpdated false
numTargetRowsDeleted true
numTargetRowsInserted false

Writing Data(Frame) Out to Delta Table

writeFiles(
  spark: SparkSession,
  txn: OptimisticTransaction,
  outputDF: DataFrame): Seq[FileAction]

Fun Fact: The Name

The writeFiles name stems from the (fun?) fact that writing out to a delta table is actually creating new files.

You can also look at the given DataFrame that is usually another delta table that is nothing but a collection of AddFiles.

I found it a little funny, et toi? 😉

If the target table is partitioned and merge.repartitionBeforeWrite.enabled is enabled, writeFiles repartitions the given outputDF dataframe (using Dataset.repartition operator) before writing it out.

In the end, writeFiles requests the given OptimisticTransaction to write the data(frame) out.


writeFiles is used when MergeIntoCommand is requested to run a merge and does one of the following:

isCdcEnabled

isCdcEnabled(
  deltaTxn: OptimisticTransaction): Boolean

isCdcEnabled is the value of the enableChangeDataFeed table property (from the metadata of a delta table).


isCdcEnabled is used when: