Skip to content

MergeIntoCommand

MergeIntoCommand is a transactional DeltaCommand that represents a DeltaMergeInto logical command at execution.

Performance Metrics

Name web UI
numSourceRows number of source rows
numSourceRowsInSecondScan number of source rows (during repeated scan)
numTargetRowsCopied number of target rows rewritten unmodified
numTargetRowsInserted number of inserted rows
numTargetRowsUpdated number of updated rows
numTargetRowsDeleted number of deleted rows
numTargetFilesBeforeSkipping number of target files before skipping
numTargetFilesAfterSkipping number of target files after skipping
numTargetFilesRemoved number of files removed to target
numTargetFilesAdded number of files added to target
numTargetChangeFilesAdded number of change data capture files generated
numTargetChangeFileBytes total size of change data capture files generated
numTargetBytesBeforeSkipping number of target bytes before skipping
numTargetBytesAfterSkipping number of target bytes after skipping
numTargetBytesRemoved number of target bytes removed
numTargetBytesAdded number of target bytes added
numTargetPartitionsAfterSkipping number of target partitions after skipping
numTargetPartitionsRemovedFrom number of target partitions from which files were removed
numTargetPartitionsAddedTo number of target partitions to which files were added
executionTimeMs time taken to execute the entire operation
scanTimeMs time taken to scan the files for matches
rewriteTimeMs time taken to rewrite the matched files

number of target rows rewritten unmodified

numTargetRowsCopied performance metric (like the other metrics) is turned into a non-deterministic user-defined function (UDF).

numTargetRowsCopied becomes incrNoopCountExpr UDF.

incrNoopCountExpr UDF is resolved on a joined plan and used to create a JoinedRowProcessor for processing partitions of the joined plan Dataset.

Creating Instance

MergeIntoCommand takes the following to be created:

MergeIntoCommand is created when:

Source Data

source: LogicalPlan

When created, MergeIntoCommand is given a LogicalPlan (Spark SQL) for the source data to merge from (internally referred to as source).

The source is used twice:

Tip

Enable DEBUG logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand logger to see the inner-workings of writeAllChanges.

Migrated Schema

migratedSchema: Option[StructType]

MergeIntoCommand can be given a migratedSchema (Spark SQL).

Target Delta Table

targetDeltaLog: DeltaLog

targetDeltaLog is the DeltaLog (of the TahoeFileIndex) of the target delta table.

targetDeltaLog is used for the following:

Lazy Value

targetDeltaLog is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and cached afterwards.

Executing Command

run(
  spark: SparkSession): Seq[Row]

run is part of the RunnableCommand (Spark SQL) abstraction.


run is a transactional operation that is made up of the following steps:

  1. Begin Transaction
    1. schema.autoMerge.enabled
    2. FileActions
    3. Register Metrics
  2. Commit Transaction
  3. Re-Cache Target Delta Table
  4. Post Metric Updates

Begin Transaction

run starts a new transaction (on the target delta table).

schema.autoMerge.enabled

Only when spark.databricks.delta.schema.autoMerge.enabled configuration property is enabled, run updates the metadata (of the transaction) with the following:

  • migratedSchema (if defined) or the schema of the target
  • isOverwriteMode flag off
  • rearrangeOnly flag off

FileActions

run determines FileActions.

Single Insert-Only Merge

For a single insert-only merge with spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled configuration property enabled, run writeInsertsOnlyWhenNoMatchedClauses.

Other Merges

Otherwise, run finds the files to rewrite (i.e., AddFiles with the rows that satisfy the merge condition) and uses them to write out merge changes.

The AddFiles are converted into RemoveFiles.

run gives the RemoveFiles and the written-out FileActions.

Register Metrics

run registers the SQL metrics (with the current transaction).

Commit Transaction

run commits the current transaction (with the FileActions and MERGE operation).

Re-Cache Target Delta Table

run requests the CacheManager to re-cache the target plan.

Post Metric Updates

In the end, run posts the SQL metric updates (as a SparkListenerDriverAccumUpdates (Apache Spark) Spark event) to SparkListeners (incl. Spark UI).

Note

Use SparkListener (Apache Spark) to intercept SparkListenerDriverAccumUpdates events.

Building Target Logical Query Plan for AddFiles

buildTargetPlanWithFiles(
  deltaTxn: OptimisticTransaction,
  files: Seq[AddFile]): LogicalPlan

buildTargetPlanWithFiles creates a DataFrame to represent the given AddFiles to access the analyzed logical query plan. buildTargetPlanWithFiles requests the given OptimisticTransaction for the DeltaLog to create a DataFrame (for the Snapshot and the given AddFiles).

In the end, buildTargetPlanWithFiles creates a Project logical operator with Alias expressions so the output columns of the analyzed logical query plan (of the DataFrame of the AddFiles) reference the target's output columns (by name).

Note

The output columns of the target delta table are associated with a OptimisticTransaction as the Metadata.

deltaTxn.metadata.schema

Exceptions

run throws an AnalysisException when the target schema is different than the delta table's (has changed after analysis phase):

The schema of your Delta table has changed in an incompatible way since your DataFrame or DeltaTable object was created. Please redefine your DataFrame or DeltaTable object. Changes:
[schemaDiff]
This check can be turned off by setting the session configuration key spark.databricks.delta.checkLatestSchemaOnRead to false.

Writing Out Single Insert-Only Merge

writeInsertsOnlyWhenNoMatchedClauses(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction): Seq[FileAction]

In the end, writeInsertsOnlyWhenNoMatchedClauses returns the FileActions (from writing out data).

writeInsertsOnlyWhenNoMatchedClauses is used when:

Target Output Columns

writeInsertsOnlyWhenNoMatchedClauses gets the names of the output (target) columns.

writeInsertsOnlyWhenNoMatchedClauses creates a collection of output columns with the target names and the resolved DeltaMergeActions of a single DeltaMergeIntoInsertClause (as Alias expressions).

Source DataFrame

writeInsertsOnlyWhenNoMatchedClauses creates a UDF to update numSourceRows metric.

writeInsertsOnlyWhenNoMatchedClauses creates a source DataFrame for the source data with Dataset.filters with the UDF and the condition of the DeltaMergeIntoInsertClause (if defined) or Literal.TrueLiteral.

Use condition for filter pushdown optimization

The condition of this single DeltaMergeIntoInsertClause is pushed down to the source when Spark SQL optimizes the query.

Data-Skipped AddFiles

writeInsertsOnlyWhenNoMatchedClauses splits conjunctive predicates (And expressions) in the merge condition and determines a so-called targetOnlyPredicates (predicates with the target columns only). writeInsertsOnlyWhenNoMatchedClauses requests the given OptimisticTransaction to filterFiles matching the target-only predicates (that gives AddFiles).

Merge Condition and Data Skipping

The merge condition of this MergeIntoCommand is used for Data Skipping.

Target DataFrame

writeInsertsOnlyWhenNoMatchedClauses creates a target DataFrame for the data-skipped AddFiles.

Insert DataFrame

writeInsertsOnlyWhenNoMatchedClauses creates a UDF to update numTargetRowsInserted metric.

writeInsertsOnlyWhenNoMatchedClauses left-anti joins the source DataFrame with the target DataFrame on the merge condition. writeInsertsOnlyWhenNoMatchedClauses selects the output columns and uses Dataset.filter with the UDF.

Demo: Left-Anti Join
val source = Seq(0, 1, 2, 3).toDF("id") // (1)!
val target = Seq(3, 4, 5).toDF("id") // (2)!
val usingColumns = Seq("id")
val q = source.join(target, usingColumns, "leftanti")
  1. Equivalent to spark.range(4)

    +---+
    | id|
    +---+
    |  0|
    |  1|
    |  2|
    |  3|
    +---+
    
  2. Equivalent to spark.range(3, 6)

    +---+
    | id|
    +---+
    |  3|
    |  4|
    |  5|
    +---+
    
scala> q.show
+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+

writeFiles

writeInsertsOnlyWhenNoMatchedClauses requests the given OptimisticTransaction to write out the insertDf (possibly repartitionIfNeeded on the partitionColumns of the target delta table).

Note

This step triggers a Spark write job (and this active transaction is marked as hasWritten).

Update Metrics

In the end, writeInsertsOnlyWhenNoMatchedClauses updates the metrics.

Writing Out Merged Data

writeAllChanges(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction,
  filesToRewrite: Seq[AddFile]): Seq[FileAction]

In the end, writeAllChanges returns the FileActions (from writing out data).


writeAllChanges is used when:

targetOutputCols

writeAllChanges builds the target output schema (possibly with some nulls for the target columns that are not in the current schema).

newTarget Query Plan

writeAllChanges builds a target logical query plan for the given filesToRewrite AddFiles.

Join Type

writeAllChanges determines the join type to use:

writeAllChanges prints out the following DEBUG message to the logs:

writeAllChanges using [joinType] join:
source.output: [outputSet]
target.output: [outputSet]
condition: [condition]
newTarget.output: [outputSet]

Source DataFrame

writeAllChanges creates a source DataFrame for the source query plan and an extra column:

New Target DataFrame

writeAllChanges creates a target DataFrame for the newTarget query plan and an extra column:

  • _target_row_present_ with true literal

Joined DataFrame

writeAllChanges creates a joined DataFrame that is a Dataset.join of the source and new target DataFrames with the given join condition and the join type.

Output Schema

writeAllChanges creates an output schema based on cdcEnabled:

  • FIXME

Create JoinedRowProcessor

writeAllChanges creates a JoinedRowProcessor (that is then used to map over partitions of the joined DataFrame).

Output DataFrame

writeAllChanges creates a DataFrame for the joinedPlan and uses Dataset.mapPartitions operator to let the JoinedRowProcessor to processPartition.

writeAllChanges drops row_dropped and incr_row_count columns.

writeAllChanges prints out the following DEBUG message to the logs:

writeAllChanges: join output plan:
[outputDF.queryExecution]

writeFiles

writeAllChanges requests the given OptimisticTransaction to write out the insertDf (possibly repartitioning if needed on the partition columns).

Note

This step triggers a Spark write job (and this active transaction is marked as hasWritten).

Update Metrics

In the end, writeAllChanges updates the metrics.

Single Insert-Only Merge

isSingleInsertOnly: Boolean

isSingleInsertOnly holds true when this MERGE command is a single WHEN NOT MATCHED THEN INSERT:

  1. No MATCHED clauses
  2. There is just a single notMatchedClauses
Example: Single Insert-Only Merge
MERGE INTO merge_demo to
USING merge_demo_source from
ON to.id = from.id
WHEN NOT MATCHED THEN INSERT *;

Matched-Only Merge

isMatchedOnly: Boolean

isMatchedOnly holds true when this MERGE command is WHEN MATCHED THEN-only:

  1. No NOT MATCHED clauses
  2. MATCHED clauses only
MERGE INTO merge_demo to
USING merge_demo_source from
ON to.id = from.id
WHEN MATCHED AND to.id < 3 THEN DELETE
WHEN MATCHED THEN UPDATE SET *;

Creating Metric Update UDF

makeMetricUpdateUDF(
  name: String): Expression

makeMetricUpdateUDF looks up the performance metric (by name) in the metrics.

In the end, makeMetricUpdateUDF defines a non-deterministic UDF to increment the metric (when executed).

makeMetricUpdateUDF is used when:

notMatchedClauseOutput

notMatchedClauseOutput(
  clause: DeltaMergeIntoInsertClause): Seq[Seq[Expression]]

notMatchedClauseOutput returns the main data output followed by the CDF data output when cdcEnabled:

(mainDataOutput)

or

(mainDataOutput, insertCdcOutput)

notMatchedClauseOutput creates the output (resolved expressions) of the main data based on the following:

  1. The Expressions from the DeltaMergeActions of the given DeltaMergeIntoInsertClause
  2. FalseLiteral
  3. The UDF to increment the numTargetRowsInserted metric
  4. null literal (to indicate the main data not CDF's)

With cdcEnabled, notMatchedClauseOutput creates the output (resolved expressions) of the CDF data based on the following:

  1. The Expressions from the DeltaMergeActions of the given DeltaMergeIntoInsertClause
  2. FalseLiteral
  3. TrueLiteral
  4. insert literal

Note

The first two expressions are the same in the main data and CDF data outputs.


notMatchedClauseOutput is used when:

Finding Files to Rewrite

findTouchedFiles(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction): Seq[AddFile]

Important

findTouchedFiles is such a fine piece of art (a Delta gem). It uses a custom accumulator, a UDF (to use this accumulator to record touched file names) and input_file_name() standard function for the names of the files read.

It is always worth keeping in mind that Delta Lake uses files for data storage and that is why input_file_name() standard function works. It would not work for non-file-based data sources.

Example 1: Understanding the Internals of findTouchedFiles

The following query writes out a 10-element dataset using the default parquet data source to /tmp/parquet directory:

val target = "/tmp/parquet"
spark.range(10).write.save(target)

The number of parquet part files varies based on the number of partitions (CPU cores).

The following query loads the parquet dataset back alongside input_file_name() standard function to mimic findTouchedFiles's behaviour.

val FILE_NAME_COL = "_file_name_"
val dataFiles = spark.read.parquet(target).withColumn(FILE_NAME_COL, input_file_name())
scala> dataFiles.show(truncate = false)
+---+---------------------------------------------------------------------------------------+
|id |_file_name_                                                                            |
+---+---------------------------------------------------------------------------------------+
|4  |file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|0  |file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|3  |file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|6  |file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|1  |file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|8  |file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|2  |file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|7  |file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|5  |file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|9  |file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
+---+---------------------------------------------------------------------------------------+

As you may have thought, not all part files have got data and so they are not included in the dataset. That is when findTouchedFiles uses groupBy operator and count action to calculate match frequency.

val counts = dataFiles.groupBy(FILE_NAME_COL).count()
scala> counts.show(truncate = false)
+---------------------------------------------------------------------------------------+-----+
|_file_name_                                                                            |count|
+---------------------------------------------------------------------------------------+-----+
|file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
+---------------------------------------------------------------------------------------+-----+

Let's load all the part files in the /tmp/parquet directory and find which file(s) have no data.

import scala.sys.process._
val cmd = (s"ls $target" #| "grep .parquet").lineStream
val allFiles = cmd.toArray.toSeq.toDF(FILE_NAME_COL)
  .select(concat(lit(s"file://$target/"), col(FILE_NAME_COL)) as FILE_NAME_COL)
val joinType = "left_anti" // MergeIntoCommand uses inner as it wants data file
val noDataFiles = allFiles.join(dataFiles, Seq(FILE_NAME_COL), joinType)

Mind that the data vs non-data datasets could be different, but that should not "interfere" with the main reasoning flow.

scala> noDataFiles.show(truncate = false)
+---------------------------------------------------------------------------------------+
|_file_name_                                                                            |
+---------------------------------------------------------------------------------------+
|file:///tmp/parquet/part-00000-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
+---------------------------------------------------------------------------------------+

findTouchedFiles registers an accumulator to collect all the distinct files that need to be rewritten (touched files).

Note

The name of the accumulator is internal.metrics.MergeIntoDelta.touchedFiles and internal.metrics part is supposed to hide it from web UI as potentially large (set of file names to be rewritten).

recordTouchedFileName

findTouchedFiles defines a nondeterministic UDF that adds the file names to the accumulator (recordTouchedFileName).

dataSkippedFiles

findTouchedFiles splits conjunctive predicates (And binary expressions) in the condition expression and collects the predicates that use the target's columns (targetOnlyPredicates). findTouchedFiles requests the given OptimisticTransaction for the files that match the target-only predicates (and creates a dataSkippedFiles collection of AddFiles).

Note

This step looks similar to filter predicate pushdown.

findTouchedFiles creates one DataFrame for the source data (using Spark SQL utility).

findTouchedFiles builds a logical query plan for the files (matching the predicates) and creates another DataFrame for the target data. findTouchedFiles adds two columns to the target dataframe:

  1. _row_id_ for monotonically_increasing_id() standard function
  2. _file_name_ for input_file_name() standard function

findTouchedFiles creates (a DataFrame that is) an INNER JOIN of the source and target DataFrames using the condition expression.

findTouchedFiles takes the joined dataframe and selects _row_id_ column and the recordTouchedFileName UDF on the _file_name_ column as one. The DataFrame is internally known as collectTouchedFiles.

findTouchedFiles uses groupBy operator on _row_id_ to calculate a sum of all the values in the one column (as count column) in the two-column collectTouchedFiles dataset. The DataFrame is internally known as matchedRowCounts.

Note

No Spark job has been submitted yet. findTouchedFiles is still in "query preparation" mode.

findTouchedFiles uses filter on the count column (in the matchedRowCounts dataset) with values greater than 1. If there are any, findTouchedFiles throws an UnsupportedOperationException exception:

Cannot perform MERGE as multiple source rows matched and attempted to update the same
target row in the Delta table. By SQL semantics of merge, when multiple source rows match
on the same target row, the update operation is ambiguous as it is unclear which source
should be used to update the matching target row.
You can preprocess the source table to eliminate the possibility of multiple matches.

Note

Since findTouchedFiles uses count action there should be a Spark SQL query reported (and possibly Spark jobs) in web UI.

findTouchedFiles requests the touchedFilesAccum accumulator for the touched file names.

Example 2: Understanding the Internals of findTouchedFiles
val TOUCHED_FILES_ACCUM_NAME = "MergeIntoDelta.touchedFiles"
val touchedFilesAccum = spark.sparkContext.collectionAccumulator[String](TOUCHED_FILES_ACCUM_NAME)
val recordTouchedFileName = udf { (fileName: String) => {
  touchedFilesAccum.add(fileName)
  1
}}.asNondeterministic()
val target = "/tmp/parquet"
spark.range(10).write.save(target)
val FILE_NAME_COL = "_file_name_"
val dataFiles = spark.read.parquet(target).withColumn(FILE_NAME_COL, input_file_name())
val collectTouchedFiles = dataFiles.select(col(FILE_NAME_COL), recordTouchedFileName(col(FILE_NAME_COL)).as("one"))
scala> collectTouchedFiles.show(truncate = false)
+---------------------------------------------------------------------------------------+---+
|_file_name_                                                                            |one|
+---------------------------------------------------------------------------------------+---+
|file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
+---------------------------------------------------------------------------------------+---+    
import scala.collection.JavaConverters._
val touchedFileNames = touchedFilesAccum.value.asScala.toSeq

Use the Stages tab in web UI to review the accumulator values.

findTouchedFiles prints out the following TRACE message to the logs:

findTouchedFiles: matched files:
  [touchedFileNames]

findTouchedFiles generateCandidateFileMap for the files that match the target-only predicates.

findTouchedFiles getTouchedFile for every touched file name.

findTouchedFiles updates the following performance metrics:

In the end, findTouchedFiles gives the touched files (as AddFiles).

repartitionIfNeeded

repartitionIfNeeded(
  spark: SparkSession,
  df: DataFrame,
  partitionColumns: Seq[String]): DataFrame

repartitionIfNeeded repartitions the given DataFrame by the partitionColumns (using Dataset.repartition operation) when all the following hold:

  1. There is at least one partition column (among the given partitionColumns)
  2. spark.databricks.delta.merge.repartitionBeforeWrite.enabled configuration property is true

repartitionIfNeeded is used when:

LeafRunnableCommand

MergeIntoCommand is a LeafRunnableCommand (Spark SQL) logical operator.

Demo

Demo: Merge Operation.

Logging

Enable ALL logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.commands.MergeIntoCommand=ALL

Refer to Logging.