ClassicMergeExecutor¶
ClassicMergeExecutor is an extension of the MergeOutputGeneration abstraction for "classic" execution of MERGE command (when requested to run a merge) when one of the following holds:
MERGEis not insert-only (so there are WHEN MATCHED or WHEN NOT MATCHED BY SOURCE clauses)- spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled is disabled (that would lead to use InsertOnlyMergeExecutor instead)
InsertOnlyMergeExecutor
When one of the above requirements is not met, InsertOnlyMergeExecutor is used instead.
With ClassicMergeExecutor chosen, MergeIntoCommand starts by finding data files to rewrite and, only when there are any AddFiles found, requests ClassicMergeExecutor to write out merge changes to a target delta table.
Finding (Add)Files to Rewrite¶
findTouchedFiles(
spark: SparkSession,
deltaTxn: OptimisticTransaction): (Seq[AddFile], DeduplicateCDFDeletes)
findTouchedFiles is used when:
MergeIntoCommandis requested to run a merge (with a non-insert-only merge or merge.optimizeInsertOnlyMerge.enabled disabled)
Important
findTouchedFiles is such a fine piece of art (a Delta gem). It uses a custom accumulator, a UDF (to use this accumulator to record touched file names) and input_file_name() standard function for the names of the files read.
It is always worth keeping in mind that Delta Lake uses files for data storage and that is why input_file_name() standard function works. It would not work for non-file-based data sources.
Example 1: Understanding the Internals of findTouchedFiles
The following query writes out a 10-element dataset using the default parquet data source to /tmp/parquet directory:
val target = "/tmp/parquet"
spark.range(10).write.save(target)
The number of parquet part files varies based on the number of partitions (which depends on the number of CPU cores).
The following query loads the parquet dataset back alongside input_file_name() standard function to mimic findTouchedFiles's behaviour.
val FILE_NAME_COL = "_file_name_"
val dataFiles = spark.read.parquet(target).withColumn(FILE_NAME_COL, input_file_name())
scala> dataFiles.show(truncate = false)
+---+---------------------------------------------------------------------------------------+
|id |_file_name_ |
+---+---------------------------------------------------------------------------------------+
|4 |file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|0 |file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|3 |file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|6 |file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|1 |file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|8 |file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|2 |file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|7 |file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|5 |file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|9 |file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
+---+---------------------------------------------------------------------------------------+
As you may have thought, not all part files have got data and so they are not included in the dataset. That is when findTouchedFiles uses groupBy operator and count action to calculate match frequency.
val counts = dataFiles.groupBy(FILE_NAME_COL).count()
scala> counts.show(truncate = false)
+---------------------------------------------------------------------------------------+-----+
|_file_name_ |count|
+---------------------------------------------------------------------------------------+-----+
|file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
+---------------------------------------------------------------------------------------+-----+
Let's load all the part files in the /tmp/parquet directory and find which file(s) have no data.
import scala.sys.process._
val cmd = (s"ls $target" #| "grep .parquet").lineStream
val allFiles = cmd.toArray.toSeq.toDF(FILE_NAME_COL)
.select(concat(lit(s"file://$target/"), col(FILE_NAME_COL)) as FILE_NAME_COL)
val joinType = "left_anti" // MergeIntoCommand uses inner as it wants data file
val noDataFiles = allFiles.join(dataFiles, Seq(FILE_NAME_COL), joinType)
Mind that the data vs non-data datasets could be different, but that should not "interfere" with the main reasoning flow.
scala> noDataFiles.show(truncate = false)
+---------------------------------------------------------------------------------------+
|_file_name_ |
+---------------------------------------------------------------------------------------+
|file:///tmp/parquet/part-00000-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
+---------------------------------------------------------------------------------------+
Fun Fact: Synomyms
The phrases "touched files" and "files to rewrite" are synonyms.
findTouchedFiles records this merge operation with the following:
| Property | Value |
|---|---|
extraOpType | findTouchedFiles |
status | MERGE operation - scanning files for matches |
sqlMetricName | scanTimeMs |
findTouchedFiles registers an internal SetAccumulator with internal.metrics.MergeIntoDelta.touchedFiles name.
Note
The name of the accumulator starts with internal.metrics prefix so it won't be displayed in the web UI (Spark Core).
findTouchedFiles creates a non-deterministic UDF that records the names of touched files (adds them to the accumulator).
Example 2: Understanding the Internals of findTouchedFiles
val TOUCHED_FILES_ACCUM_NAME = "MergeIntoDelta.touchedFiles"
val touchedFilesAccum = spark.sparkContext.collectionAccumulator[String](TOUCHED_FILES_ACCUM_NAME)
val recordTouchedFileName = udf { (fileName: String) => {
touchedFilesAccum.add(fileName)
1
}}.asNondeterministic()
val target = "/tmp/parquet"
spark.range(10).write.save(target)
val FILE_NAME_COL = "_file_name_"
val dataFiles = spark.read.parquet(target).withColumn(FILE_NAME_COL, input_file_name())
val collectTouchedFiles = dataFiles.select(col(FILE_NAME_COL), recordTouchedFileName(col(FILE_NAME_COL)).as("one"))
scala> collectTouchedFiles.show(truncate = false)
+---------------------------------------------------------------------------------------+---+
|_file_name_ |one|
+---------------------------------------------------------------------------------------+---+
|file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
+---------------------------------------------------------------------------------------+---+
import scala.collection.JavaConverters._
val touchedFileNames = touchedFilesAccum.value.asScala.toSeq
Use the Stages tab in web UI to review the accumulator values.
findTouchedFiles determines the AddFiles and prune non-matching files (dataSkippedFiles). With no WHEN NOT MATCHED BY SOURCE clauses, findTouchedFiles requests the given OptimisticTransaction for the AddFiles matching getTargetOnlyPredicates. Otherwise, findTouchedFiles requests for all the AddFiles (an accept-all predicate).
findTouchedFiles determines the join type (joinType). With no WHEN NOT MATCHED BY SOURCE clauses, findTouchedFiles uses INNER join type. Otherwise, it is RIGHT_OUTER join.
findTouchedFiles determines the matched predicate (matchedPredicate). When isMatchedOnly, findTouchedFiles converts the WHEN MATCHED clauses to their conditions, if defined, or falls back to accept-all predicate and then reduces to Or expressions. Otherwise, findTouchedFiles uses accept-all predicate for the matched predicate.
findTouchedFiles creates a Catalyst expression (incrSourceRowCountExpr) that increments the numSourceRows metric (and returns true value).
findTouchedFiles gets the source DataFrame and adds an extra column _source_row_present_ for the incrSourceRowCountExpr expression that is used to DataFrame.filter by (and, more importanly and as a side effect, counts the number of source rows).
findTouchedFiles builds a logical plan (targetPlan) with the dataSkippedFiles files (and columnsToDrop to be dropped).
findTouchedFiles creates a target DataFrame (targetDF) from the target plan with two extra columns:
| Column Name | Expression |
|---|---|
_row_id_ | monotonically_increasing_id() standard function |
_file_name_ | input_file_name() standard function |
findTouchedFiles creates a joined DataFrame (joinToFindTouchedFiles) with the sourceDF and targetDF dataframes, the condition as the join condition, and the join type (INNER or RIGHT OUTER).
findTouchedFiles creates recordTouchedFileName UDF. recordTouchedFileName UDF does two things:
- Records the names of the touched files (the
_file_name_column) in thetouchedFilesAccumaccumulator - Returns
1
findTouchedFiles takes the joinToFindTouchedFiles dataframe to select the columns:
_row_id_onethat isrecordTouchedFileNameUDF executed on the_file_name_column and thematchedPredicate(based on the conditional WHEN MATCHED clauses)
In other words, the collectTouchedFiles dataframe is made up of two columns:
_row_id_(the values ofmonotonically_increasing_idstandard function)one(with the result of therecordTouchedFileNameUDF)
findTouchedFiles calculates the frequency of matches per source row. findTouchedFiles calculates the total of 1s per _row_id_ column. The result is recorded in count column. In other words, the matchedRowCounts dataframe is made up of two columns:
_row_id_(the values ofmonotonically_increasing_idstandard function)count(the total of the1s inonecolumn)
Note
No Spark job has been submitted yet. findTouchedFiles is still in "query preparation" mode.
findTouchedFiles counts the number of rows in the matchedRowCounts dataset with count above 1 (multipleMatchCount) and the total of count (multipleMatchSum). If there are no such rows, the values are both 0.
Note
Since findTouchedFiles triggers collect action, there should be a Spark SQL query reported (and possibly Spark jobs) in web UI.
findTouchedFiles makes a sanity check (based on multipleMatchCount).
With multiple matches (occurred and allowed), findTouchedFiles stores the difference of multipleMatchSum and multipleMatchCount in the multipleMatchDeleteOnlyOvercount. This is only allowed for delete-only queries.
findTouchedFiles requests the touchedFilesAccum accumulator for the touched file names (touchedFileNames).
findTouchedFiles prints out the following TRACE message to the logs (with touchedFileNames from the touchedFilesAccum accumulator):
findTouchedFiles: matched files:
[touchedFileNames]
Finding Matched Files as Distributed Computation
There are a couple of very fundamental Spark "things" in play here:
- The
touchedFilesAccumaccumulator - The
recordTouchedFileNameUDF that uses the accumulator - The
collectTouchedFilesdataframe withinput_file_namestandard function (as_file_name_column) - Calculating
multipleMatchCountandmultipleMatchSumvalues in a Spark job
All together, it allowed findTouchedFiles to run a distributed computation (a Spark job) to collect (accumulate) matched files.
findTouchedFiles generateCandidateFileMap for the files that match the target-only predicates (dataSkippedFiles). findTouchedFiles uses the candidate file map to convert the matched files (touchedFileNames) to getTouchedFile.
findTouchedFiles updates the following performance metrics:
- numTargetFilesBeforeSkipping
- numTargetBytesBeforeSkipping
- numTargetFilesAfterSkipping
- numTargetBytesAfterSkipping
- numTargetPartitionsAfterSkipping
- numTargetFilesRemoved
- numTargetBytesRemoved
- numTargetPartitionsRemovedFrom
In the end, findTouchedFiles returns the following:
- touched files (as AddFiles)
- DeduplicateCDFDeletes
Data-Skipped Files¶
For no notMatchedBySourceClauses, findTouchedFiles splits conjunctive predicates (And expressions) in the merge condition and determines a so-called target-only predicates (predicates with the target columns only).
findTouchedFiles requests the given OptimisticTransaction for the data files based on notMatchedBySourceClauses. For no notMatchedBySourceClauses, findTouchedFiles requests only the ones matching the target-only predicates. Otherwise, findTouchedFiles requests all the data files.
Merge Condition and Data Skipping
This is the moment when the merge condition of this MergeIntoCommand participates in Data Skipping.
Writing Out All Merge Changes (to Delta Table)¶
writeAllChanges(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
filesToRewrite: Seq[AddFile],
deduplicateCDFDeletes: DeduplicateCDFDeletes,
writeUnmodifiedRows: Boolean): Seq[FileAction]
Change Data Feed
writeAllChanges acts differently with or no Change Data Feed enabled.
Deletion Vectors
writeUnmodifiedRows input flag is disabled (false) to indicate that Deletion Vectors should be used (with shouldWritePersistentDeletionVectors enabled).
The unmodified rows do not have to be written out and writeAllChanges can perform stricter joins.
writeAllChanges records this merge operation with the following:
| Property | Value |
|---|---|
extraOpType |
|
status | MERGE operation - Rewriting [filesToRewrite] files |
sqlMetricName | rewriteTimeMs |
CDF Generation
writeAllChanges asserts that one of the following holds:
- CDF generation is disabled (based on the given
DeduplicateCDFDeletesflag) - isCdcEnabled is enabled
Otherwise, writeAllChanges reports an IllegalArgumentException:
CDF delete duplication is enabled but overall the CDF generation is disabled
writeAllChanges creates a DataFrame for the target plan with the given AddFiles to rewrite (filesToRewrite) (and no columnsToDrop).
writeAllChanges determines the join type. With writeUnmodifiedRows enabled (true), the join type is as follows:
rightOuterfor shouldOptimizeMatchedOnlyMerge enabledfullOuterotherwise
With writeUnmodifiedRows disabled (false), the join type is as follows (in that order):
innerforisMatchedOnlyenabledleftOuterfor nonotMatchedBySourceClausesrightOuterfor nonotMatchedClausesfullOuterotherwise
shouldOptimizeMatchedOnlyMerge Used Twice
shouldOptimizeMatchedOnlyMerge is used twice for the following:
extraOpTypeto record this merge operation- The join type
| shouldOptimizeMatchedOnlyMerge | extraOpType | joinType |
|---|---|---|
true | writeAllUpdatesAndDeletes | RIGHT OUTER |
false | writeAllChanges | FULL OUTER |
writeAllChanges prints out the following DEBUG message to the logs:
writeAllChanges using [joinType] join:
source.output: [source]
target.output: [target]
condition: [condition]
newTarget.output: [baseTargetDF]
writeAllChanges creates Catalyst expressions to increment SQL metrics:
| Metric Name | valueToReturn |
|---|---|
| numSourceRowsInSecondScan | true |
| numTargetRowsCopied | false |
writeAllChanges creates joinedDF DataFrame based on the source dataframe and the buildTargetPlanWithFiles, for the left- and right side of the join, respectively.
For the source/left side of the join, writeAllChanges adds the following columns:
| Column Name | Expression |
|---|---|
_source_row_present_ | Increment the numSourceRowsInSecondScan metric and return true literal |
_source_row_index(only when the given DeduplicateCDFDeletes is enabled and has includesInserts enabled) | monotonically_increasing_id() standard function |
For the target/right side of the join, writeAllChanges adds the following columns:
| Column Name | Expression |
|---|---|
_target_row_present_ | true literal |
_target_row_index_(only when the given DeduplicateCDFDeletes is enabled) | monotonically_increasing_id() standard function |
In the end, writeAllChanges joins the DataFrames (using DataFrame.join operator) with the merge condition as the join condition and the determined join type (rightOuter or fullOuter).
writeAllChanges generatePrecomputedConditionsAndDF with the joined DataFrame and the given merge clauses (matchedClauses, notMatchedClauses, notMatchedBySourceClauses).
writeAllChanges generates the output columns (outputCols):
writeAllChangesdetermines the target output columns (targetOutputCols)writeAllChangesadds one extra_row_dropped_possibly with another_change_typeextra column if isCdcEnabled to the target output columns (outputColNames)writeAllChangesadds the expression to increment the incrNoopCountExpr metric possibly with another sentinelnullexpression if isCdcEnabled to the target output columns (noopCopyExprs)- In the end,
writeAllChangesgenerateWriteAllChangesOutputCols
writeAllChanges creates an output DataFrame (outputDF) based on Change Data Feed:
- With Change Data Feed enabled,
writeAllChangesgenerateCdcAndOutputRows (withjoinedAndPrecomputedConditionsDFas the source dataframe) - Otherwise,
writeAllChangesselects theoutputColscolumns from thejoinedAndPrecomputedConditionsDFdataframe (Dataset.selectoperator)
writeAllChanges makes sure that the output dataframe includes only rows that are not dropped (with _row_dropped_ column being false using Dataset.where operator).
writeAllChanges drops the _row_dropped_ column from the output dataframe so it does not leak to the output.
writeAllChanges prints out the following DEBUG message to the logs:
writeAllChanges: join output plan:
[outputDF]
writeAllChanges writes out the outputDF DataFrame (that gives FileActions).
In the end, writeAllChanges updates the metrics.
writeAllChanges is used when:
MergeIntoCommandis requested to run a merge (with a non-insert-only merge or merge.optimizeInsertOnlyMerge.enabled disabled)
Logging¶
ClassicMergeExecutor is an abstract class and logging is configured using the logger of the MergeIntoCommand.