Skip to content

ClassicMergeExecutor

ClassicMergeExecutor is an extension of the MergeOutputGeneration abstraction for "classic" execution of MERGE command (when requested to run a merge) when one of the following holds:

InsertOnlyMergeExecutor

When one of the above requirements is not met, InsertOnlyMergeExecutor is used instead.

With ClassicMergeExecutor chosen, MergeIntoCommand starts by finding data files to rewrite and, only when there are any AddFiles found, requests ClassicMergeExecutor to write out merge changes to a target delta table.

Finding (Add)Files to Rewrite

findTouchedFiles(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction): (Seq[AddFile], DeduplicateCDFDeletes)

findTouchedFiles is used when:

Important

findTouchedFiles is such a fine piece of art (a Delta gem). It uses a custom accumulator, a UDF (to use this accumulator to record touched file names) and input_file_name() standard function for the names of the files read.

It is always worth keeping in mind that Delta Lake uses files for data storage and that is why input_file_name() standard function works. It would not work for non-file-based data sources.

Example 1: Understanding the Internals of findTouchedFiles

The following query writes out a 10-element dataset using the default parquet data source to /tmp/parquet directory:

val target = "/tmp/parquet"
spark.range(10).write.save(target)

The number of parquet part files varies based on the number of partitions (which depends on the number of CPU cores).

The following query loads the parquet dataset back alongside input_file_name() standard function to mimic findTouchedFiles's behaviour.

val FILE_NAME_COL = "_file_name_"
val dataFiles = spark.read.parquet(target).withColumn(FILE_NAME_COL, input_file_name())
scala> dataFiles.show(truncate = false)
+---+---------------------------------------------------------------------------------------+
|id |_file_name_                                                                            |
+---+---------------------------------------------------------------------------------------+
|4  |file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|0  |file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|3  |file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|6  |file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|1  |file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|8  |file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|2  |file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|7  |file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|5  |file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|9  |file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
+---+---------------------------------------------------------------------------------------+

As you may have thought, not all part files have got data and so they are not included in the dataset. That is when findTouchedFiles uses groupBy operator and count action to calculate match frequency.

val counts = dataFiles.groupBy(FILE_NAME_COL).count()
scala> counts.show(truncate = false)
+---------------------------------------------------------------------------------------+-----+
|_file_name_                                                                            |count|
+---------------------------------------------------------------------------------------+-----+
|file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
|file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1    |
+---------------------------------------------------------------------------------------+-----+

Let's load all the part files in the /tmp/parquet directory and find which file(s) have no data.

import scala.sys.process._
val cmd = (s"ls $target" #| "grep .parquet").lineStream
val allFiles = cmd.toArray.toSeq.toDF(FILE_NAME_COL)
  .select(concat(lit(s"file://$target/"), col(FILE_NAME_COL)) as FILE_NAME_COL)
val joinType = "left_anti" // MergeIntoCommand uses inner as it wants data file
val noDataFiles = allFiles.join(dataFiles, Seq(FILE_NAME_COL), joinType)

Mind that the data vs non-data datasets could be different, but that should not "interfere" with the main reasoning flow.

scala> noDataFiles.show(truncate = false)
+---------------------------------------------------------------------------------------+
|_file_name_                                                                            |
+---------------------------------------------------------------------------------------+
|file:///tmp/parquet/part-00000-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
+---------------------------------------------------------------------------------------+

Fun Fact: Synomyms

The phrases "touched files" and "files to rewrite" are synonyms.

findTouchedFiles records this merge operation with the following:

Property Value
extraOpType findTouchedFiles
status MERGE operation - scanning files for matches
sqlMetricName scanTimeMs

findTouchedFiles registers an internal SetAccumulator with internal.metrics.MergeIntoDelta.touchedFiles name.

Note

The name of the accumulator starts with internal.metrics prefix so it won't be displayed in the web UI (Spark Core).

findTouchedFiles creates a non-deterministic UDF that records the names of touched files (adds them to the accumulator).

Example 2: Understanding the Internals of findTouchedFiles
val TOUCHED_FILES_ACCUM_NAME = "MergeIntoDelta.touchedFiles"
val touchedFilesAccum = spark.sparkContext.collectionAccumulator[String](TOUCHED_FILES_ACCUM_NAME)
val recordTouchedFileName = udf { (fileName: String) => {
  touchedFilesAccum.add(fileName)
  1
}}.asNondeterministic()
val target = "/tmp/parquet"
spark.range(10).write.save(target)
val FILE_NAME_COL = "_file_name_"
val dataFiles = spark.read.parquet(target).withColumn(FILE_NAME_COL, input_file_name())
val collectTouchedFiles = dataFiles.select(col(FILE_NAME_COL), recordTouchedFileName(col(FILE_NAME_COL)).as("one"))
scala> collectTouchedFiles.show(truncate = false)
+---------------------------------------------------------------------------------------+---+
|_file_name_                                                                            |one|
+---------------------------------------------------------------------------------------+---+
|file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
|file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1  |
+---------------------------------------------------------------------------------------+---+
import scala.collection.JavaConverters._
val touchedFileNames = touchedFilesAccum.value.asScala.toSeq

Use the Stages tab in web UI to review the accumulator values.

findTouchedFiles determines the AddFiles and prune non-matching files (dataSkippedFiles). With no WHEN NOT MATCHED BY SOURCE clauses, findTouchedFiles requests the given OptimisticTransaction for the AddFiles matching getTargetOnlyPredicates. Otherwise, findTouchedFiles requests for all the AddFiles (an accept-all predicate).

findTouchedFiles determines the join type (joinType). With no WHEN NOT MATCHED BY SOURCE clauses, findTouchedFiles uses INNER join type. Otherwise, it is RIGHT_OUTER join.

Inner vs Right Outer Joins

Learn more on Wikipedia:

findTouchedFiles determines the matched predicate (matchedPredicate). When isMatchedOnly, findTouchedFiles converts the WHEN MATCHED clauses to their conditions, if defined, or falls back to accept-all predicate and then reduces to Or expressions. Otherwise, findTouchedFiles uses accept-all predicate for the matched predicate.

findTouchedFiles creates a Catalyst expression (incrSourceRowCountExpr) that increments the numSourceRows metric (and returns true value).

findTouchedFiles gets the source DataFrame and adds an extra column _source_row_present_ for the incrSourceRowCountExpr expression that is used to DataFrame.filter by (and, more importanly and as a side effect, counts the number of source rows).

findTouchedFiles builds a logical plan (targetPlan) with the dataSkippedFiles files (and columnsToDrop to be dropped).

findTouchedFiles creates a target DataFrame (targetDF) from the target plan with two extra columns:

Column Name Expression
_row_id_ monotonically_increasing_id() standard function
_file_name_ input_file_name() standard function

findTouchedFiles creates a joined DataFrame (joinToFindTouchedFiles) with the sourceDF and targetDF dataframes, the condition as the join condition, and the join type (INNER or RIGHT OUTER).

findTouchedFiles creates recordTouchedFileName UDF. recordTouchedFileName UDF does two things:

  1. Records the names of the touched files (the _file_name_ column) in the touchedFilesAccum accumulator
  2. Returns 1

findTouchedFiles takes the joinToFindTouchedFiles dataframe to select the columns:

  • _row_id_
  • one that is recordTouchedFileName UDF executed on the _file_name_ column and the matchedPredicate (based on the conditional WHEN MATCHED clauses)

In other words, the collectTouchedFiles dataframe is made up of two columns:

  • _row_id_ (the values of monotonically_increasing_id standard function)
  • one (with the result of the recordTouchedFileName UDF)

findTouchedFiles calculates the frequency of matches per source row. findTouchedFiles calculates the total of 1s per _row_id_ column. The result is recorded in count column. In other words, the matchedRowCounts dataframe is made up of two columns:

  • _row_id_ (the values of monotonically_increasing_id standard function)
  • count (the total of the 1s in one column)

Note

No Spark job has been submitted yet. findTouchedFiles is still in "query preparation" mode.

findTouchedFiles counts the number of rows in the matchedRowCounts dataset with count above 1 (multipleMatchCount) and the total of count (multipleMatchSum). If there are no such rows, the values are both 0.

Note

Since findTouchedFiles triggers collect action, there should be a Spark SQL query reported (and possibly Spark jobs) in web UI.

findTouchedFiles makes a sanity check (based on multipleMatchCount).

With multiple matches (occurred and allowed), findTouchedFiles stores the difference of multipleMatchSum and multipleMatchCount in the multipleMatchDeleteOnlyOvercount. This is only allowed for delete-only queries.

findTouchedFiles requests the touchedFilesAccum accumulator for the touched file names (touchedFileNames).

findTouchedFiles prints out the following TRACE message to the logs (with touchedFileNames from the touchedFilesAccum accumulator):

findTouchedFiles: matched files:
  [touchedFileNames]

Finding Matched Files as Distributed Computation

There are a couple of very fundamental Spark "things" in play here:

  1. The touchedFilesAccum accumulator
  2. The recordTouchedFileName UDF that uses the accumulator
  3. The collectTouchedFiles dataframe with input_file_name standard function (as _file_name_ column)
  4. Calculating multipleMatchCount and multipleMatchSum values in a Spark job

All together, it allowed findTouchedFiles to run a distributed computation (a Spark job) to collect (accumulate) matched files.

findTouchedFiles generateCandidateFileMap for the files that match the target-only predicates (dataSkippedFiles). findTouchedFiles uses the candidate file map to convert the matched files (touchedFileNames) to getTouchedFile.

findTouchedFiles updates the following performance metrics:

In the end, findTouchedFiles returns the following:

  1. touched files (as AddFiles)
  2. DeduplicateCDFDeletes

Data-Skipped Files

For no notMatchedBySourceClauses, findTouchedFiles splits conjunctive predicates (And expressions) in the merge condition and determines a so-called target-only predicates (predicates with the target columns only).

findTouchedFiles requests the given OptimisticTransaction for the data files based on notMatchedBySourceClauses. For no notMatchedBySourceClauses, findTouchedFiles requests only the ones matching the target-only predicates. Otherwise, findTouchedFiles requests all the data files.

Merge Condition and Data Skipping

This is the moment when the merge condition of this MergeIntoCommand participates in Data Skipping.

Writing Out All Merge Changes (to Delta Table)

writeAllChanges(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction,
  filesToRewrite: Seq[AddFile],
  deduplicateCDFDeletes: DeduplicateCDFDeletes,
  writeUnmodifiedRows: Boolean): Seq[FileAction]

Change Data Feed

writeAllChanges acts differently with or no Change Data Feed enabled.

Deletion Vectors

writeUnmodifiedRows input flag is disabled (false) to indicate that Deletion Vectors should be used (with shouldWritePersistentDeletionVectors enabled).

The unmodified rows do not have to be written out and writeAllChanges can perform stricter joins.

writeAllChanges records this merge operation with the following:

Property Value
extraOpType
status MERGE operation - Rewriting [filesToRewrite] files
sqlMetricName rewriteTimeMs
CDF Generation

writeAllChanges asserts that one of the following holds:

  1. CDF generation is disabled (based on the given DeduplicateCDFDeletes flag)
  2. isCdcEnabled is enabled

Otherwise, writeAllChanges reports an IllegalArgumentException:

CDF delete duplication is enabled but overall the CDF generation is disabled

writeAllChanges creates a DataFrame for the target plan with the given AddFiles to rewrite (filesToRewrite) (and no columnsToDrop).

writeAllChanges determines the join type. With writeUnmodifiedRows enabled (true), the join type is as follows:

  1. rightOuter for shouldOptimizeMatchedOnlyMerge enabled
  2. fullOuter otherwise

With writeUnmodifiedRows disabled (false), the join type is as follows (in that order):

  1. inner for isMatchedOnly enabled
  2. leftOuter for no notMatchedBySourceClauses
  3. rightOuter for no notMatchedClauses
  4. fullOuter otherwise
shouldOptimizeMatchedOnlyMerge Used Twice

shouldOptimizeMatchedOnlyMerge is used twice for the following:

  1. extraOpType to record this merge operation
  2. The join type
shouldOptimizeMatchedOnlyMerge extraOpType joinType
true writeAllUpdatesAndDeletes RIGHT OUTER
false writeAllChanges FULL OUTER

writeAllChanges prints out the following DEBUG message to the logs:

writeAllChanges using [joinType] join:
  source.output: [source]
  target.output: [target]
  condition: [condition]
  newTarget.output: [baseTargetDF]

writeAllChanges creates Catalyst expressions to increment SQL metrics:

Metric Name valueToReturn
numSourceRowsInSecondScan true
numTargetRowsCopied false

writeAllChanges creates joinedDF DataFrame based on the source dataframe and the buildTargetPlanWithFiles, for the left- and right side of the join, respectively.

For the source/left side of the join, writeAllChanges adds the following columns:

Column Name Expression
_source_row_present_ Increment the numSourceRowsInSecondScan metric and return true literal
_source_row_index
(only when the given DeduplicateCDFDeletes is enabled and has includesInserts enabled)
monotonically_increasing_id() standard function

For the target/right side of the join, writeAllChanges adds the following columns:

Column Name Expression
_target_row_present_ true literal
_target_row_index_
(only when the given DeduplicateCDFDeletes is enabled)
monotonically_increasing_id() standard function

In the end, writeAllChanges joins the DataFrames (using DataFrame.join operator) with the merge condition as the join condition and the determined join type (rightOuter or fullOuter).

writeAllChanges generatePrecomputedConditionsAndDF with the joined DataFrame and the given merge clauses (matchedClauses, notMatchedClauses, notMatchedBySourceClauses).

writeAllChanges generates the output columns (outputCols):

  1. writeAllChanges determines the target output columns (targetOutputCols)
  2. writeAllChanges adds one extra _row_dropped_ possibly with another _change_type extra column if isCdcEnabled to the target output columns (outputColNames)
  3. writeAllChanges adds the expression to increment the incrNoopCountExpr metric possibly with another sentinel null expression if isCdcEnabled to the target output columns (noopCopyExprs)
  4. In the end, writeAllChanges generateWriteAllChangesOutputCols

writeAllChanges creates an output DataFrame (outputDF) based on Change Data Feed:

  • With Change Data Feed enabled, writeAllChanges generateCdcAndOutputRows (with joinedAndPrecomputedConditionsDF as the source dataframe)
  • Otherwise, writeAllChanges selects the outputCols columns from the joinedAndPrecomputedConditionsDF dataframe (Dataset.select operator)

writeAllChanges makes sure that the output dataframe includes only rows that are not dropped (with _row_dropped_ column being false using Dataset.where operator).

writeAllChanges drops the _row_dropped_ column from the output dataframe so it does not leak to the output.

writeAllChanges prints out the following DEBUG message to the logs:

writeAllChanges: join output plan:
[outputDF]

writeAllChanges writes out the outputDF DataFrame (that gives FileActions).

In the end, writeAllChanges updates the metrics.


writeAllChanges is used when:

Logging

ClassicMergeExecutor is an abstract class and logging is configured using the logger of the MergeIntoCommand.