Skip to content


MergeIntoCommandBase is a DeltaCommand and an extension of the LeafRunnableCommand (Spark SQL) abstraction for merge delta commands.

MergeIntoCommandBase is a MergeIntoMaterializeSource.

Contract (Subset)

Merge Condition

condition: Expression

The "join" condition of this merge


Used when:


matchedClauses: Seq[DeltaMergeIntoMatchedClause]



Used when:


notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause]



Used when:


notMatchedBySourceClauses: Seq[DeltaMergeIntoNotMatchedBySourceClause]



Used when:

Running Merge

  spark: SparkSession): Seq[Row]


Used when:

  • MergeIntoCommandBase is requested to run

Source Data

source: LogicalPlan

A LogicalPlan (Spark SQL) of the source data to merge from (internally referred to as source)


Used when:

  • MergeIntoCommandBase is requested to run


Performance Metrics

Name web UI
numSourceRows number of source rows
numSourceRowsInSecondScan number of source rows (during repeated scan)
numTargetRowsCopied number of target rows rewritten unmodified
numTargetRowsInserted number of inserted rows
numTargetRowsUpdated number of updated rows
numTargetRowsDeleted number of deleted rows
numTargetFilesBeforeSkipping number of target files before skipping
numTargetFilesAfterSkipping number of target files after skipping
numTargetFilesRemoved number of files removed to target
numTargetFilesAdded number of files added to target
numTargetChangeFilesAdded number of change data capture files generated
numTargetChangeFileBytes total size of change data capture files generated
numTargetBytesBeforeSkipping number of target bytes before skipping
numTargetBytesAfterSkipping number of target bytes after skipping
numTargetBytesRemoved number of target bytes removed
numTargetBytesAdded number of target bytes added
numTargetPartitionsAfterSkipping number of target partitions after skipping
numTargetPartitionsRemovedFrom number of target partitions from which files were removed
numTargetPartitionsAddedTo number of target partitions to which files were added
numTargetRowsMatchedDeleted number of rows deleted by a matched clause
numTargetRowsNotMatchedBySourceDeleted number of rows deleted by a not matched by source clause
numTargetRowsMatchedUpdated number of rows updated by a matched clause
numTargetRowsNotMatchedBySourceUpdated number of rows updated by a not matched by source clause
executionTimeMs time taken to execute the entire operation
scanTimeMs time taken to scan the files for matches
rewriteTimeMs time taken to rewrite the matched files

number of deleted rows

number of inserted rows

number of rows deleted by a matched clause

number of rows deleted by a not matched by source clause

number of rows updated by a matched clause

number of rows updated by a not matched by source clause

number of source rows (during repeated scan)

number of target rows rewritten unmodified

numTargetRowsCopied performance metric (like the other metrics) is turned into a non-deterministic user-defined function (UDF).

numTargetRowsCopied becomes incrNoopCountExpr UDF.

incrNoopCountExpr UDF is resolved on a joined plan and used to create a JoinedRowProcessor for processing partitions of the joined plan Dataset.

number of updated rows

time taken to rewrite the matched files

time taken to scan the files for matches

Building Target (Logical) Plan Spanned Over Fewer Files

  spark: SparkSession,
  deltaTxn: OptimisticTransaction,
  files: Seq[AddFile],
  columnsToDrop: Seq[String]): LogicalPlan
columnsToDrop Argument

columnsToDrop is always empty (Nil) but for ClassicMergeExecutor to findTouchedFiles.

buildTargetPlanWithFiles creates a TahoeBatchFileIndex for the given AddFiles (files) only.


buildTargetPlanWithFiles creates a LogicalPlan of a delta table that is possibly smaller parquet data files (spanning over a smaller number of files) than the "source".

In the end, buildTargetPlanWithFiles buildTargetPlanWithIndex for the TahoeBatchFileIndex and the given columnsToDrop column names.

buildTargetPlanWithFiles is used when:


  spark: SparkSession,
  deltaTxn: OptimisticTransaction,
  fileIndex: TahoeFileIndex,
  columnsToDrop: Seq[String]): LogicalPlan

buildTargetPlanWithIndex gets the references to columns in the target dataframe (AttributeReferences) and nulls for new columns that are added to the target table (as part of this OptimisticTransaction).

In the end, buildTargetPlanWithIndex creates a Project logical operator with new columns (with nulls) after the existing ones.

Special Columns


_row_dropped_ column name is used when:

CDF-Aware Output Generation

It appears that ClassicMergeExecutor and InsertOnlyMergeExecutor use _row_dropped_ column to include the rows with false value in output dataframes followed by dropping the column immediately.

An "exception" to the behaviour is MergeOutputGeneration for CDF-aware output generation.

Executing Command

  spark: SparkSession): Seq[Row]

run is part of the RunnableCommand (Spark SQL) abstraction.

run is a transactional operation that is made up of the following steps:

  1. Begin Transaction
    1. schema.autoMerge.enabled
    2. FileActions
    3. Register Metrics
  2. Commit Transaction
  3. Re-Cache Target Delta Table
  4. Post Metric Updates

FIXME Review the sections

Begin Transaction

run starts a new transaction (on the target delta table).


Only when configuration property is enabled, run updates the metadata (of the transaction) with the following:

  • migratedSchema (if defined) or the schema of the target
  • isOverwriteMode flag off
  • rearrangeOnly flag off


run determines FileActions.

Single Insert-Only Merge

For a single insert-only merge with configuration property enabled, run writeInsertsOnlyWhenNoMatchedClauses.

Other Merges

Otherwise, run finds the files to rewrite (i.e., AddFiles with the rows that satisfy the merge condition) and uses them to write out merge changes.

The AddFiles are converted into RemoveFiles.

run gives the RemoveFiles and the written-out FileActions.

Register Metrics

run registers the SQL metrics (with the current transaction).

Commit Transaction

run commits the current transaction (with the FileActions and MERGE operation).

Re-Cache Target Delta Table

run requests the CacheManager to re-cache the target plan.

Post Metric Updates

In the end, run posts the SQL metric updates (as a SparkListenerDriverAccumUpdates (Apache Spark) Spark event) to SparkListeners (incl. Spark UI).


Use SparkListener (Apache Spark) to intercept SparkListenerDriverAccumUpdates events.

Building Target Logical Query Plan for AddFiles

  deltaTxn: OptimisticTransaction,
  files: Seq[AddFile]): LogicalPlan

buildTargetPlanWithFiles creates a DataFrame to represent the given AddFiles to access the analyzed logical query plan. buildTargetPlanWithFiles requests the given OptimisticTransaction for the DeltaLog to create a DataFrame (for the Snapshot and the given AddFiles).

In the end, buildTargetPlanWithFiles creates a Project logical operator with Alias expressions so the output columns of the analyzed logical query plan (of the DataFrame of the AddFiles) reference the target's output columns (by name).


The output columns of the target delta table are associated with a OptimisticTransaction as the Metadata.



run throws an AnalysisException when the target schema is different than the delta table's (has changed after analysis phase):

The schema of your Delta table has changed in an incompatible way since your DataFrame or DeltaTable object was created. Please redefine your DataFrame or DeltaTable object. Changes:
This check can be turned off by setting the session configuration key to false.


isInsertOnly: Boolean

isInsertOnly is positive for an insert-only merge (i.e. when there are WHEN NOT MATCHED clauses only in this MERGE command).

In other words, isInsertOnly is enabled (true) when all the following hold:

  1. There are neither WHEN MATCHED nor WHEN NOT MATCHED BY SOURCE clauses
  2. There are WHEN NOT MATCHED clauses

isInsertOnly is used when:

Matched-Only Merge

isMatchedOnly: Boolean

isMatchedOnly is enabled (true) when this merge is WHEN MATCHED-only:

isMatchedOnly is used when:


  spark: SparkSession): Boolean

shouldOptimizeMatchedOnlyMerge is enabled (true) when the following all hold:

shouldOptimizeMatchedOnlyMerge is used when:

Finding Target-Only Predicates (Among Merge and Clause Conditions)

  spark: SparkSession): Seq[Expression]

getTargetOnlyPredicates determines the target-only predicates in the merge condition first (targetOnlyPredicatesOnCondition). getTargetOnlyPredicates splits conjunctive predicates (Ands) in the merge condition and leaves only the ones with the column references to the columns in the target table only.

getTargetOnlyPredicates branches off based on whether this merge is matched-only or not.

For a non-matched-only merge, getTargetOnlyPredicates returns the target-only condition predicates.

Otherwise, getTargetOnlyPredicates does the same (what it has just done with the merge condition) with conditional WHEN MATCHED clauses (the ones with a condition specified). getTargetOnlyPredicates splits conjunctive predicates (Ands) in the conditions of the matchedClauses and leaves only the ones with the column references to the target.

In the end, getTargetOnlyPredicates returns the target-only condition predicates (from the merge condition and all the conditions from the conditional WHEN MATCHED clauses).

getTargetOnlyPredicates is used when:


  hasMultipleMatches: Boolean,
  spark: SparkSession): Unit

throwErrorOnMultipleMatches is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

throwErrorOnMultipleMatches throws a DeltaUnsupportedOperationException when the given hasMultipleMatches is enabled (true) and isOnlyOneUnconditionalDelete is disabled (false).

throwErrorOnMultipleMatches is used when:


isOnlyOneUnconditionalDelete: Boolean

isOnlyOneUnconditionalDelete is positive (true) when there is only one when matched clause that is a DELETE with no condition.

In other words, isOnlyOneUnconditionalDelete is true for the following:

Recording Merge Operation

  extraOpType: String = "",
  status: String = null,
  sqlMetricName: String = null)(
  thunk: => A): A

recordMergeOperation creates a operation type (changedOpType) based on the given extraOpType identifier as follows:


recordMergeOperation appends the given status to the existing Spark job description (as spark.job.description local property), if any.

In the end, recordMergeOperation executes the given thunk code block:

  1. Records the start time
  2. Sets a human readable description of the current job (Spark Core) as the prefixed status
  3. Adds the time taken to the given sqlMetricName
  4. Restores the job description to the previous one, if any

MergeOutputGeneration extraOpType status sqlMetricName
ClassicMergeExecutor findTouchedFiles MERGE operation - scanning files for matches scanTimeMs
ClassicMergeExecutor writeAllUpdatesAndDeletes or writeAllChanges MERGE operation - Rewriting n files rewriteTimeMs
InsertOnlyMergeExecutor writeInsertsOnlyWhenNoMatchedClauses or writeInsertsOnlyWhenNoMatches MERGE operation - writing new files for only inserts rewriteTimeMs

recordMergeOperation is used when:


includesInserts: Boolean

includesInserts is positive (true) when there are WHEN NOT MATCHED clauses in this merge (with WHEN NOT MATCHED THEN INSERTs possible and hence the name of this method).

includesInserts is used when:

Auto Schema Merging

canMergeSchema: Boolean

canMergeSchema is part of the ImplicitMetadataOperation abstraction.

canMergeSchema creates a DeltaOptions for the canMergeSchema (based on the SQLConf only).

Options always empty

Delta options cannot be passed to MERGE INTO, so they will always be empty and canMergeSchema relies on the SQLConf only.

Create IncrementMetric Expression

  name: String,
  valueToReturn: Boolean): Expression

incrementMetricAndReturnBool creates a IncrementMetric unary expression with the following:

IncrementMetric Value
Child Expression A Literal with the given valueToReturn (when executed)
SQLMetric The metric by the given name


The IncrementMetric presents itself under increment_metric name in query plans.

When executed, IncrementMetric increments the metric (by the given name) and returns the given valueToReturn.

Usage Metric Name valueToReturn
ClassicMergeExecutor to find files to rewrite numSourceRows true
ClassicMergeExecutor to write out merge changes numSourceRowsInSecondScan true
numTargetRowsCopied false
InsertOnlyMergeExecutor to write out inserts numSourceRows or numSourceRowsInSecondScan true
InsertOnlyMergeExecutor to generateInsertsOnlyOutputCols numTargetRowsInserted false
MergeOutputGeneration to generateAllActionExprs numTargetRowsUpdated false
numTargetRowsDeleted true
numTargetRowsInserted false

Writing Data(Frame) Out to Delta Table

  spark: SparkSession,
  txn: OptimisticTransaction,
  outputDF: DataFrame): Seq[FileAction]

Fun Fact: The Name

The writeFiles name stems from the (fun?) fact that writing out to a delta table is actually creating new files.

You can also look at the given DataFrame that is usually another delta table that is nothing but a collection of AddFiles.

I found it a little funny, et toi? 😉

If the target table is partitioned and merge.repartitionBeforeWrite.enabled is enabled, writeFiles repartitions the given outputDF dataframe (using Dataset.repartition operator) before writing it out.

In the end, writeFiles requests the given OptimisticTransaction to write the data(frame) out.

writeFiles is used when MergeIntoCommand is requested to run a merge and does one of the following:


  deltaTxn: OptimisticTransaction): Boolean

isCdcEnabled is the value of the enableChangeDataFeed table property (from the metadata of a delta table).

isCdcEnabled is used when: