Skip to content

MergeIntoCommand

MergeIntoCommand is a DeltaCommand that represents a DeltaMergeInto logical command at execution.

MergeIntoCommand is a RunnableCommand logical operator (Spark SQL).

Tip

Learn more on the internals of MergeIntoCommand in Demo: Merge Operation.

Performance Metrics

Name web UI
numSourceRows number of source rows
numTargetRowsCopied number of target rows rewritten unmodified
numTargetRowsInserted number of inserted rows
numTargetRowsUpdated number of updated rows
numTargetRowsDeleted number of deleted rows
numTargetFilesBeforeSkipping number of target files before skipping
numTargetFilesAfterSkipping number of target files after skipping
numTargetFilesRemoved number of files removed to target
numTargetFilesAdded number of files added to target

number of target rows rewritten unmodified

numTargetRowsCopied performance metric (like the other metrics) is turned into a non-deterministic user-defined function (UDF).

numTargetRowsCopied becomes incrNoopCountExpr UDF.

incrNoopCountExpr UDF is resolved on a joined plan and used to create a JoinedRowProcessor for processing partitions of the joined plan Dataset.

Creating Instance

MergeIntoCommand takes the following to be created:

  • Source Data
  • Target Data (LogicalPlan)
  • TahoeFileIndex
  • Condition Expression
  • Matched Clauses (Seq[DeltaMergeIntoMatchedClause])
  • Optional Non-Matched Clause (Option[DeltaMergeIntoInsertClause])
  • Migrated Schema

MergeIntoCommand is created when:

Source Data (to Merge From)

When created, MergeIntoCommand is given a LogicalPlan for the source data to merge from (referred to internally as source).

The source LogicalPlan is used twice:

Tip

Enable DEBUG logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand logger to see the inner-workings of writeAllChanges.

Target DeltaLog

targetDeltaLog: DeltaLog

targetDeltaLog is the DeltaLog of the TahoeFileIndex.

targetDeltaLog is used for the following:

Lazy Value

targetDeltaLog is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and cached afterwards.

Executing Command

run(
  spark: SparkSession): Seq[Row]

run is part of the RunnableCommand (Spark SQL) abstraction.

run requests the target DeltaLog to start a new transaction.

With spark.databricks.delta.schema.autoMerge.enabled configuration property enabled, run updates the metadata (of the transaction).

run determines Delta actions (RemoveFiles and AddFiles).

Describe deltaActions part

With spark.databricks.delta.history.metricsEnabled configuration property enabled, run requests the current transaction to register SQL metrics for the Delta operation.

run requests the current transaction to commit (with the Delta actions and Merge operation).

run records the Delta event.

run posts a SparkListenerDriverAccumUpdates Spark event (with the metrics).

In the end, run requests the CacheManager to recacheByPlan.

Finding Files to Rewrite

findTouchedFiles(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction): Seq[AddFile]

Important

findTouchedFiles is such a fine piece of art (a gem). It uses a custom accumulator, a UDF (to use this accumulator to record touched file names) and input_file_name() standard function for the names of the files read.

It is always worth keeping in mind that Delta Lake uses files for data storage and that is why input_file_name() standard function works. It would not work for non-file-based data sources.

Example 1: Understanding the Internals of findTouchedFiles

The following query writes out a 10-element dataset using the default parquet data source to /tmp/parquet directory:

val target = "/tmp/parquet"
spark.range(10).write.save(target)

The number of parquet part files varies based on the number of partitions (CPU cores).

The following query loads the parquet dataset back alongside input_file_name() standard function to mimic findTouchedFiles's behaviour.

val FILE_NAME_COL = "_file_name_"
val dataFiles = spark.read.parquet(target).withColumn(FILE_NAME_COL, input_file_name())
scala> dataFiles.show(truncate = false)
+---+---------------------------------------------------------------------------------------+
|id |_file_name_                                                                            |
+---+---------------------------------------------------------------------------------------+
|4  |file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|0  |file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|3  |file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|6  |file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|1  |file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|8  |file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|2  |file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|7  |file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|5  |file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|9  |file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
+---+---------------------------------------------------------------------------------------+

As you may have thought, not all part files have got data and so they are not included in the dataset. That is when findTouchedFiles uses groupBy operator and count action to calculate match frequency.

val counts = dataFiles.groupBy(FILE_NAME_COL).count()
scala> counts.show(truncate = false)
+---------------------------------------------------------------------------------------+-----+
|_file_name_                                                                            |count|
+---------------------------------------------------------------------------------------+-----+
|file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
+---------------------------------------------------------------------------------------+-----+

Let's load all the part files in the /tmp/parquet directory and find which file(s) have no data.

import scala.sys.process._
val cmd = (s"ls $target" #| "grep .parquet").lineStream
val allFiles = cmd.toArray.toSeq.toDF(FILE_NAME_COL)
  .select(concat(lit(s"file://$target/"), col(FILE_NAME_COL)) as FILE_NAME_COL)
val joinType = "left_anti" // MergeIntoCommand uses inner as it wants data file
val noDataFiles = allFiles.join(dataFiles, Seq(FILE_NAME_COL), joinType)

Mind that the data vs non-data datasets could be different, but that should not "interfere" with the main reasoning flow.

scala> noDataFiles.show(truncate = false)
+---------------------------------------------------------------------------------------+
|_file_name_                                                                            |
+---------------------------------------------------------------------------------------+
|file:///tmp/parquet/part-00000-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
+---------------------------------------------------------------------------------------+

findTouchedFiles registers an accumulator to collect all the distinct files that need to be rewritten (touched files).

Note

The name of the accumulator is internal.metrics.MergeIntoDelta.touchedFiles and internal.metrics part is supposed to hide it from web UI as potentially large (set of file names to be rewritten).

findTouchedFiles defines a nondeterministic UDF that adds the file names to the accumulator (recordTouchedFileName).

findTouchedFiles splits conjunctive predicates (And binary expressions) in the condition expression and collects the predicates that use the target's columns (targetOnlyPredicates). findTouchedFiles requests the given OptimisticTransaction for the files that match the target-only predicates (and creates a dataSkippedFiles collection of AddFiles).

Note

This step looks similar to filter predicate pushdown.

findTouchedFiles creates one DataFrame for the source data (using Dataset.ofRows utility).

Tip

Learn more about Dataset.ofRows utility in The Internals of Spark SQL online book.

findTouchedFiles builds a logical query plan for the files (matching the predicates) and creates another DataFrame for the target data. findTouchedFiles adds two columns to the target dataframe:

  1. _row_id_ for monotonically_increasing_id() standard function
  2. _file_name_ for input_file_name() standard function

findTouchedFiles creates (a DataFrame that is) an INNER JOIN of the source and target DataFrames using the condition expression.

findTouchedFiles takes the joined dataframe and selects _row_id_ column and the recordTouchedFileName UDF on the _file_name_ column as one. The DataFrame is internally known as collectTouchedFiles.

findTouchedFiles uses groupBy operator on _row_id_ to calculate a sum of all the values in the one column (as count column) in the two-column collectTouchedFiles dataset. The DataFrame is internally known as matchedRowCounts.

Note

No Spark job has been submitted yet. findTouchedFiles is still in "query preparation" mode.

findTouchedFiles uses filter on the count column (in the matchedRowCounts dataset) with values greater than 1. If there are any, findTouchedFiles throws an UnsupportedOperationException exception:

Cannot perform MERGE as multiple source rows matched and attempted to update the same
target row in the Delta table. By SQL semantics of merge, when multiple source rows match
on the same target row, the update operation is ambiguous as it is unclear which source
should be used to update the matching target row.
You can preprocess the source table to eliminate the possibility of multiple matches.

Note

Since findTouchedFiles uses count action there should be a Spark SQL query reported (and possibly Spark jobs) in web UI.

findTouchedFiles requests the touchedFilesAccum accumulator for the touched file names.

Example 2: Understanding the Internals of findTouchedFiles
val TOUCHED_FILES_ACCUM_NAME = "MergeIntoDelta.touchedFiles"
val touchedFilesAccum = spark.sparkContext.collectionAccumulator[String](TOUCHED_FILES_ACCUM_NAME)
val recordTouchedFileName = udf { (fileName: String) => {
  touchedFilesAccum.add(fileName)
  1
}}.asNondeterministic()
val target = "/tmp/parquet"
spark.range(10).write.save(target)
val FILE_NAME_COL = "_file_name_"
val dataFiles = spark.read.parquet(target).withColumn(FILE_NAME_COL, input_file_name())
val collectTouchedFiles = dataFiles.select(col(FILE_NAME_COL), recordTouchedFileName(col(FILE_NAME_COL)).as("one"))
scala> collectTouchedFiles.show(truncate = false)
+---------------------------------------------------------------------------------------+---+
|_file_name_                                                                            |one|
+---------------------------------------------------------------------------------------+---+
|file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
+---------------------------------------------------------------------------------------+---+    
import scala.collection.JavaConverters._
val touchedFileNames = touchedFilesAccum.value.asScala.toSeq

Use the Stages tab in web UI to review the accumulator values.

findTouchedFiles prints out the following TRACE message to the logs:

findTouchedFiles: matched files:
  [touchedFileNames]

findTouchedFiles generateCandidateFileMap for the files that match the target-only predicates.

findTouchedFiles getTouchedFile for every touched file name.

findTouchedFiles updates the following performance metrics:

In the end, findTouchedFiles gives the touched files (as AddFiles).

Writing All Changes

writeAllChanges(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction,
  filesToRewrite: Seq[AddFile]): Seq[AddFile]

writeAllChanges builds the target output columns (possibly with some nulls for the target columns that are not in the current schema).

writeAllChanges builds a target logical query plan for the AddFiles.

writeAllChanges determines a join type to use (rightOuter or fullOuter).

writeAllChanges prints out the following DEBUG message to the logs:

writeAllChanges using [joinType] join:
source.output: [outputSet]
target.output: [outputSet]
condition: [condition]
newTarget.output: [outputSet]

writeAllChanges creates a joinedDF DataFrame that is a join of the DataFrames for the source and the new target logical plans with the given join condition and the join type.

writeAllChanges creates a JoinedRowProcessor that is then used to map over partitions of the joined DataFrame.

writeAllChanges prints out the following DEBUG message to the logs:

writeAllChanges: join output plan:
[outputDF.queryExecution]

writeAllChanges requests the input OptimisticTransaction to writeFiles (possibly repartitioning by the partition columns if table is partitioned and spark.databricks.delta.merge.repartitionBeforeWrite.enabled configuration property is enabled).

writeAllChanges is used when MergeIntoCommand is requested to run.

Building Target Logical Query Plan for AddFiles

buildTargetPlanWithFiles(
  deltaTxn: OptimisticTransaction,
  files: Seq[AddFile]): LogicalPlan

buildTargetPlanWithFiles creates a DataFrame to represent the given AddFiles to access the analyzed logical query plan. buildTargetPlanWithFiles requests the given OptimisticTransaction for the DeltaLog to create a DataFrame (for the Snapshot and the given AddFiles).

In the end, buildTargetPlanWithFiles creates a Project logical operator with Alias expressions so the output columns of the analyzed logical query plan (of the DataFrame of the AddFiles) reference the target's output columns (by name).

Note

The output columns of the target delta table are associated with a OptimisticTransaction as the Metadata.

deltaTxn.metadata.schema

writeInsertsOnlyWhenNoMatchedClauses

writeInsertsOnlyWhenNoMatchedClauses(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction): Seq[AddFile]

writeInsertsOnlyWhenNoMatchedClauses...FIXME

Exceptions

run throws an AnalysisException when the target schema is different than the delta table's (has changed after analysis phase):

The schema of your Delta table has changed in an incompatible way since your DataFrame or DeltaTable object was created. Please redefine your DataFrame or DeltaTable object. Changes:
[schemaDiff]
This check can be turned off by setting the session configuration key spark.databricks.delta.checkLatestSchemaOnRead to false.

Logging

Enable ALL logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.commands.MergeIntoCommand=ALL

Refer to Logging.


Last update: 2021-03-31