ClassicMergeExecutor¶
ClassicMergeExecutor
is an extension of the MergeOutputGeneration abstraction for "classic" execution of MERGE command (when requested to run a merge) when one of the following holds:
MERGE
is not insert-only (so there are WHEN MATCHED or WHEN NOT MATCHED BY SOURCE clauses)- spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled is disabled (that would lead to use InsertOnlyMergeExecutor instead)
InsertOnlyMergeExecutor
When one of the above requirements is not met, InsertOnlyMergeExecutor is used instead.
With ClassicMergeExecutor
chosen, MergeIntoCommand starts by finding data files to rewrite and, only when there are any AddFile
s found, requests ClassicMergeExecutor
to write out merge changes to a target delta table.
Finding (Add)Files to Rewrite¶
findTouchedFiles(
spark: SparkSession,
deltaTxn: OptimisticTransaction): (Seq[AddFile], DeduplicateCDFDeletes)
findTouchedFiles
is used when:
MergeIntoCommand
is requested to run a merge (with a non-insert-only merge or merge.optimizeInsertOnlyMerge.enabled disabled)
Important
findTouchedFiles
is such a fine piece of art (a Delta gem). It uses a custom accumulator, a UDF (to use this accumulator to record touched file names) and input_file_name()
standard function for the names of the files read.
It is always worth keeping in mind that Delta Lake uses files for data storage and that is why input_file_name()
standard function works. It would not work for non-file-based data sources.
Example 1: Understanding the Internals of findTouchedFiles
The following query writes out a 10-element dataset using the default parquet data source to /tmp/parquet
directory:
val target = "/tmp/parquet"
spark.range(10).write.save(target)
The number of parquet part files varies based on the number of partitions (which depends on the number of CPU cores).
The following query loads the parquet dataset back alongside input_file_name()
standard function to mimic findTouchedFiles
's behaviour.
val FILE_NAME_COL = "_file_name_"
val dataFiles = spark.read.parquet(target).withColumn(FILE_NAME_COL, input_file_name())
scala> dataFiles.show(truncate = false)
+---+---------------------------------------------------------------------------------------+
|id |_file_name_ |
+---+---------------------------------------------------------------------------------------+
|4 |file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|0 |file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|3 |file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|6 |file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|1 |file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|8 |file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|2 |file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|7 |file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|5 |file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|9 |file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
+---+---------------------------------------------------------------------------------------+
As you may have thought, not all part files have got data and so they are not included in the dataset. That is when findTouchedFiles
uses groupBy
operator and count
action to calculate match frequency.
val counts = dataFiles.groupBy(FILE_NAME_COL).count()
scala> counts.show(truncate = false)
+---------------------------------------------------------------------------------------+-----+
|_file_name_ |count|
+---------------------------------------------------------------------------------------+-----+
|file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
+---------------------------------------------------------------------------------------+-----+
Let's load all the part files in the /tmp/parquet
directory and find which file(s) have no data.
import scala.sys.process._
val cmd = (s"ls $target" #| "grep .parquet").lineStream
val allFiles = cmd.toArray.toSeq.toDF(FILE_NAME_COL)
.select(concat(lit(s"file://$target/"), col(FILE_NAME_COL)) as FILE_NAME_COL)
val joinType = "left_anti" // MergeIntoCommand uses inner as it wants data file
val noDataFiles = allFiles.join(dataFiles, Seq(FILE_NAME_COL), joinType)
Mind that the data vs non-data datasets could be different, but that should not "interfere" with the main reasoning flow.
scala> noDataFiles.show(truncate = false)
+---------------------------------------------------------------------------------------+
|_file_name_ |
+---------------------------------------------------------------------------------------+
|file:///tmp/parquet/part-00000-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
+---------------------------------------------------------------------------------------+
Fun Fact: Synomyms
The phrases "touched files" and "files to rewrite" are synonyms.
findTouchedFiles
records this merge operation with the following:
Property | Value |
---|---|
extraOpType | findTouchedFiles |
status | MERGE operation - scanning files for matches |
sqlMetricName | scanTimeMs |
findTouchedFiles
registers an internal SetAccumulator
with internal.metrics.MergeIntoDelta.touchedFiles
name.
Note
The name of the accumulator starts with internal.metrics
prefix so it won't be displayed in the web UI (Spark Core).
findTouchedFiles
creates a non-deterministic UDF that records the names of touched files (adds them to the accumulator).
Example 2: Understanding the Internals of findTouchedFiles
val TOUCHED_FILES_ACCUM_NAME = "MergeIntoDelta.touchedFiles"
val touchedFilesAccum = spark.sparkContext.collectionAccumulator[String](TOUCHED_FILES_ACCUM_NAME)
val recordTouchedFileName = udf { (fileName: String) => {
touchedFilesAccum.add(fileName)
1
}}.asNondeterministic()
val target = "/tmp/parquet"
spark.range(10).write.save(target)
val FILE_NAME_COL = "_file_name_"
val dataFiles = spark.read.parquet(target).withColumn(FILE_NAME_COL, input_file_name())
val collectTouchedFiles = dataFiles.select(col(FILE_NAME_COL), recordTouchedFileName(col(FILE_NAME_COL)).as("one"))
scala> collectTouchedFiles.show(truncate = false)
+---------------------------------------------------------------------------------------+---+
|_file_name_ |one|
+---------------------------------------------------------------------------------------+---+
|file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
+---------------------------------------------------------------------------------------+---+
import scala.collection.JavaConverters._
val touchedFileNames = touchedFilesAccum.value.asScala.toSeq
Use the Stages tab in web UI to review the accumulator values.
findTouchedFiles
determines the AddFiles and prune non-matching files (dataSkippedFiles
). With no WHEN NOT MATCHED BY SOURCE clauses, findTouchedFiles
requests the given OptimisticTransaction for the AddFiles matching getTargetOnlyPredicates. Otherwise, findTouchedFiles
requests for all the AddFiles (an accept-all predicate).
findTouchedFiles
determines the join type (joinType
). With no WHEN NOT MATCHED BY SOURCE clauses, findTouchedFiles
uses INNER
join type. Otherwise, it is RIGHT_OUTER
join.
findTouchedFiles
determines the matched predicate (matchedPredicate
). When isMatchedOnly, findTouchedFiles
converts the WHEN MATCHED clauses to their conditions, if defined, or falls back to accept-all predicate and then reduces to Or
expressions. Otherwise, findTouchedFiles
uses accept-all predicate for the matched predicate.
findTouchedFiles
creates a Catalyst expression (incrSourceRowCountExpr
) that increments the numSourceRows metric (and returns true
value).
findTouchedFiles
gets the source DataFrame and adds an extra column _source_row_present_
for the incrSourceRowCountExpr
expression that is used to DataFrame.filter
by (and, more importanly and as a side effect, counts the number of source rows).
findTouchedFiles
builds a logical plan (targetPlan
) with the dataSkippedFiles
files (and columnsToDrop
to be dropped).
findTouchedFiles
creates a target DataFrame (targetDF
) from the target plan with two extra columns:
Column Name | Expression |
---|---|
_row_id_ | monotonically_increasing_id() standard function |
_file_name_ | input_file_name() standard function |
findTouchedFiles
creates a joined DataFrame (joinToFindTouchedFiles
) with the sourceDF
and targetDF
dataframes, the condition as the join condition, and the join type (INNER or RIGHT OUTER).
findTouchedFiles
creates recordTouchedFileName
UDF. recordTouchedFileName
UDF does two things:
- Records the names of the touched files (the
_file_name_
column) in thetouchedFilesAccum
accumulator - Returns
1
findTouchedFiles
takes the joinToFindTouchedFiles
dataframe to select the columns:
_row_id_
one
that isrecordTouchedFileName
UDF executed on the_file_name_
column and thematchedPredicate
(based on the conditional WHEN MATCHED clauses)
In other words, the collectTouchedFiles
dataframe is made up of two columns:
_row_id_
(the values ofmonotonically_increasing_id
standard function)one
(with the result of therecordTouchedFileName
UDF)
findTouchedFiles
calculates the frequency of matches per source row. findTouchedFiles
calculates the total of 1
s per _row_id_
column. The result is recorded in count
column. In other words, the matchedRowCounts
dataframe is made up of two columns:
_row_id_
(the values ofmonotonically_increasing_id
standard function)count
(the total of the1
s inone
column)
Note
No Spark job has been submitted yet. findTouchedFiles
is still in "query preparation" mode.
findTouchedFiles
counts the number of rows in the matchedRowCounts
dataset with count
above 1
(multipleMatchCount
) and the total of count
(multipleMatchSum
). If there are no such rows, the values are both 0
.
Note
Since findTouchedFiles
triggers collect
action, there should be a Spark SQL query reported (and possibly Spark jobs) in web UI.
findTouchedFiles
makes a sanity check (based on multipleMatchCount
).
With multiple matches (occurred and allowed), findTouchedFiles
stores the difference of multipleMatchSum
and multipleMatchCount
in the multipleMatchDeleteOnlyOvercount. This is only allowed for delete-only queries.
findTouchedFiles
requests the touchedFilesAccum
accumulator for the touched file names (touchedFileNames
).
findTouchedFiles
prints out the following TRACE message to the logs (with touchedFileNames
from the touchedFilesAccum
accumulator):
findTouchedFiles: matched files:
[touchedFileNames]
Finding Matched Files as Distributed Computation
There are a couple of very fundamental Spark "things" in play here:
- The
touchedFilesAccum
accumulator - The
recordTouchedFileName
UDF that uses the accumulator - The
collectTouchedFiles
dataframe withinput_file_name
standard function (as_file_name_
column) - Calculating
multipleMatchCount
andmultipleMatchSum
values in a Spark job
All together, it allowed findTouchedFiles
to run a distributed computation (a Spark job) to collect (accumulate) matched files.
findTouchedFiles
generateCandidateFileMap for the files that match the target-only predicates (dataSkippedFiles
). findTouchedFiles
uses the candidate file map to convert the matched files (touchedFileNames
) to getTouchedFile.
findTouchedFiles
updates the following performance metrics:
- numTargetFilesBeforeSkipping
- numTargetBytesBeforeSkipping
- numTargetFilesAfterSkipping
- numTargetBytesAfterSkipping
- numTargetPartitionsAfterSkipping
- numTargetFilesRemoved
- numTargetBytesRemoved
- numTargetPartitionsRemovedFrom
In the end, findTouchedFiles
returns the following:
- touched files (as AddFiles)
- DeduplicateCDFDeletes
Data-Skipped Files¶
For no notMatchedBySourceClauses, findTouchedFiles
splits conjunctive predicates (And
expressions) in the merge condition and determines a so-called target-only predicates (predicates with the target columns only).
findTouchedFiles
requests the given OptimisticTransaction for the data files based on notMatchedBySourceClauses. For no notMatchedBySourceClauses, findTouchedFiles
requests only the ones matching the target-only predicates. Otherwise, findTouchedFiles
requests all the data files.
Merge Condition and Data Skipping
This is the moment when the merge condition of this MergeIntoCommand participates in Data Skipping.
Writing Out All Merge Changes (to Delta Table)¶
writeAllChanges(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
filesToRewrite: Seq[AddFile],
deduplicateCDFDeletes: DeduplicateCDFDeletes,
writeUnmodifiedRows: Boolean): Seq[FileAction]
Change Data Feed
writeAllChanges
acts differently with or no Change Data Feed enabled.
Deletion Vectors
writeUnmodifiedRows
input flag is disabled (false
) to indicate that Deletion Vectors should be used (with shouldWritePersistentDeletionVectors enabled).
The unmodified rows do not have to be written out and writeAllChanges
can perform stricter joins.
writeAllChanges
records this merge operation with the following:
Property | Value |
---|---|
extraOpType |
|
status | MERGE operation - Rewriting [filesToRewrite] files |
sqlMetricName | rewriteTimeMs |
CDF Generation
writeAllChanges
asserts that one of the following holds:
- CDF generation is disabled (based on the given
DeduplicateCDFDeletes
flag) - isCdcEnabled is enabled
Otherwise, writeAllChanges
reports an IllegalArgumentException
:
CDF delete duplication is enabled but overall the CDF generation is disabled
writeAllChanges
creates a DataFrame
for the target plan with the given AddFiles to rewrite (filesToRewrite
) (and no columnsToDrop
).
writeAllChanges
determines the join type. With writeUnmodifiedRows
enabled (true
), the join type is as follows:
rightOuter
for shouldOptimizeMatchedOnlyMerge enabledfullOuter
otherwise
With writeUnmodifiedRows
disabled (false
), the join type is as follows (in that order):
inner
forisMatchedOnly
enabledleftOuter
for nonotMatchedBySourceClauses
rightOuter
for nonotMatchedClauses
fullOuter
otherwise
shouldOptimizeMatchedOnlyMerge
Used Twice
shouldOptimizeMatchedOnlyMerge is used twice for the following:
extraOpType
to record this merge operation- The join type
shouldOptimizeMatchedOnlyMerge | extraOpType | joinType |
---|---|---|
true | writeAllUpdatesAndDeletes | RIGHT OUTER |
false | writeAllChanges | FULL OUTER |
writeAllChanges
prints out the following DEBUG message to the logs:
writeAllChanges using [joinType] join:
source.output: [source]
target.output: [target]
condition: [condition]
newTarget.output: [baseTargetDF]
writeAllChanges
creates Catalyst expressions to increment SQL metrics:
Metric Name | valueToReturn |
---|---|
numSourceRowsInSecondScan | true |
numTargetRowsCopied | false |
writeAllChanges
creates joinedDF
DataFrame based on the source dataframe and the buildTargetPlanWithFiles, for the left- and right side of the join, respectively.
For the source/left side of the join, writeAllChanges
adds the following columns:
Column Name | Expression |
---|---|
_source_row_present_ | Increment the numSourceRowsInSecondScan metric and return true literal |
_source_row_index (only when the given DeduplicateCDFDeletes is enabled and has includesInserts enabled) | monotonically_increasing_id() standard function |
For the target/right side of the join, writeAllChanges
adds the following columns:
Column Name | Expression |
---|---|
_target_row_present_ | true literal |
_target_row_index_ (only when the given DeduplicateCDFDeletes is enabled) | monotonically_increasing_id() standard function |
In the end, writeAllChanges
joins the DataFrames (using DataFrame.join
operator) with the merge condition as the join condition and the determined join type (rightOuter
or fullOuter
).
writeAllChanges
generatePrecomputedConditionsAndDF with the joined DataFrame and the given merge clauses (matchedClauses, notMatchedClauses, notMatchedBySourceClauses).
writeAllChanges
generates the output columns (outputCols
):
writeAllChanges
determines the target output columns (targetOutputCols
)writeAllChanges
adds one extra_row_dropped_
possibly with another_change_type
extra column if isCdcEnabled to the target output columns (outputColNames
)writeAllChanges
adds the expression to increment the incrNoopCountExpr metric possibly with another sentinelnull
expression if isCdcEnabled to the target output columns (noopCopyExprs
)- In the end,
writeAllChanges
generateWriteAllChangesOutputCols
writeAllChanges
creates an output DataFrame (outputDF
) based on Change Data Feed:
- With Change Data Feed enabled,
writeAllChanges
generateCdcAndOutputRows (withjoinedAndPrecomputedConditionsDF
as the source dataframe) - Otherwise,
writeAllChanges
selects theoutputCols
columns from thejoinedAndPrecomputedConditionsDF
dataframe (Dataset.select
operator)
writeAllChanges
makes sure that the output dataframe includes only rows that are not dropped (with _row_dropped_
column being false
using Dataset.where
operator).
writeAllChanges
drops the _row_dropped_
column from the output dataframe so it does not leak to the output.
writeAllChanges
prints out the following DEBUG message to the logs:
writeAllChanges: join output plan:
[outputDF]
writeAllChanges
writes out the outputDF
DataFrame (that gives FileActions).
In the end, writeAllChanges
updates the metrics.
writeAllChanges
is used when:
MergeIntoCommand
is requested to run a merge (with a non-insert-only merge or merge.optimizeInsertOnlyMerge.enabled disabled)
Logging¶
ClassicMergeExecutor
is an abstract class and logging is configured using the logger of the MergeIntoCommand.