MergeIntoCommandBase¶
MergeIntoCommandBase
is a DeltaCommand and an extension of the LeafRunnableCommand
(Spark SQL) abstraction for merge delta commands.
MergeIntoCommandBase
is a MergeIntoMaterializeSource.
Contract (Subset)¶
Merge Condition¶
condition: Expression
The "join" condition of this merge
See:
Used when:
MergeIntoCommandBase
is requested to collectMergeStats and getTargetOnlyPredicatesClassicMergeExecutor
is requested to findTouchedFiles and writeAllChangesInsertOnlyMergeExecutor
is requested to writeOnlyInserts (withfilterMatchedRows
enabled to trim the source table)
WHEN MATCHED Clauses¶
matchedClauses: Seq[DeltaMergeIntoMatchedClause]
See:
Used when:
MergeIntoCommandBase
is requested to isMatchedOnly, isInsertOnly, collectMergeStats, isOnlyOneUnconditionalDelete, getTargetOnlyPredicatesClassicMergeExecutor
is requested to findTouchedFiles, writeAllChanges
WHEN NOT MATCHED Clauses¶
notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause]
DeltaMergeIntoNotMatchedClauses
See:
Used when:
InsertOnlyMergeExecutor
is requested to writeOnlyInsertsMergeIntoCommandBase
is requested to isMatchedOnly, isInsertOnly, includesInserts, collectMergeStatsClassicMergeExecutor
is requested to writeAllChangesInsertOnlyMergeExecutor
is requested to writeOnlyInserts, generateInsertsOnlyOutputDF, generateOneInsertOutputCols
WHEN NOT MATCHED BY SOURCE Clauses¶
notMatchedBySourceClauses: Seq[DeltaMergeIntoNotMatchedBySourceClause]
DeltaMergeIntoNotMatchedBySourceClauses
See:
Used when:
MergeIntoCommandBase
is requested to isMatchedOnly, isInsertOnly, collectMergeStatsClassicMergeExecutor
is requested to findTouchedFiles, writeAllChanges
Running Merge¶
runMerge(
spark: SparkSession): Seq[Row]
See:
Used when:
MergeIntoCommandBase
is requested to run
Source Data¶
source: LogicalPlan
A LogicalPlan
(Spark SQL) of the source data to merge from (internally referred to as source)
See:
Used when:
MergeIntoCommandBase
is requested to run
Implementations¶
Performance Metrics¶
Name | web UI |
---|---|
numSourceRows | number of source rows |
numSourceRowsInSecondScan | number of source rows (during repeated scan) |
numTargetRowsCopied | number of target rows rewritten unmodified |
numTargetRowsInserted | number of inserted rows |
numTargetRowsUpdated | number of updated rows |
numTargetRowsDeleted | number of deleted rows |
numTargetFilesBeforeSkipping | number of target files before skipping |
numTargetFilesAfterSkipping | number of target files after skipping |
numTargetFilesRemoved | number of files removed to target |
numTargetFilesAdded | number of files added to target |
numTargetChangeFilesAdded | number of change data capture files generated |
numTargetChangeFileBytes | total size of change data capture files generated |
numTargetBytesBeforeSkipping | number of target bytes before skipping |
numTargetBytesAfterSkipping | number of target bytes after skipping |
numTargetBytesRemoved | number of target bytes removed |
numTargetBytesAdded | number of target bytes added |
numTargetPartitionsAfterSkipping | number of target partitions after skipping |
numTargetPartitionsRemovedFrom | number of target partitions from which files were removed |
numTargetPartitionsAddedTo | number of target partitions to which files were added |
numTargetRowsMatchedDeleted | number of rows deleted by a matched clause |
numTargetRowsNotMatchedBySourceDeleted | number of rows deleted by a not matched by source clause |
numTargetRowsMatchedUpdated | number of rows updated by a matched clause |
numTargetRowsNotMatchedBySourceUpdated | number of rows updated by a not matched by source clause |
executionTimeMs | time taken to execute the entire operation |
scanTimeMs | time taken to scan the files for matches |
rewriteTimeMs | time taken to rewrite the matched files |
number of deleted rows¶
number of inserted rows¶
number of rows deleted by a matched clause¶
number of rows deleted by a not matched by source clause¶
number of rows updated by a matched clause¶
number of rows updated by a not matched by source clause¶
number of source rows (during repeated scan)¶
number of target rows rewritten unmodified¶
numTargetRowsCopied
performance metric (like the other metrics) is turned into a non-deterministic user-defined function (UDF).
numTargetRowsCopied
becomes incrNoopCountExpr
UDF.
incrNoopCountExpr
UDF is resolved on a joined plan and used to create a JoinedRowProcessor for processing partitions of the joined plan Dataset
.
number of updated rows¶
time taken to execute the entire operation¶
time taken to rewrite the matched files¶
time taken to scan the files for matches¶
Building Target (Logical) Plan Spanned Over Fewer Files¶
buildTargetPlanWithFiles(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
files: Seq[AddFile],
columnsToDrop: Seq[String]): LogicalPlan
columnsToDrop
Argument
columnsToDrop
is always empty (Nil
) but for ClassicMergeExecutor
to findTouchedFiles.
buildTargetPlanWithFiles
creates a TahoeBatchFileIndex for the given AddFiles (files
) only.
Note
buildTargetPlanWithFiles
creates a LogicalPlan
of a delta table that is possibly smaller parquet data files (spanning over a smaller number of files) than the "source".
In the end, buildTargetPlanWithFiles
buildTargetPlanWithIndex for the TahoeBatchFileIndex
and the given columnsToDrop
column names.
buildTargetPlanWithFiles
is used when:
ClassicMergeExecutor
is requested to findTouchedFiles and writeAllChangesInsertOnlyMergeExecutor
is requested to writeOnlyInserts
buildTargetPlanWithIndex¶
buildTargetPlanWithIndex(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
fileIndex: TahoeFileIndex,
columnsToDrop: Seq[String]): LogicalPlan
buildTargetPlanWithIndex
gets the references to columns in the target dataframe (AttributeReference
s) and null
s for new columns that are added to the target table (as part of this OptimisticTransaction).
In the end, buildTargetPlanWithIndex
creates a Project
logical operator with new columns (with null
s) after the existing ones.
Special Columns¶
_row_dropped_¶
_row_dropped_
column name is used when:
ClassicMergeExecutor
is requested to writeAllChangesInsertOnlyMergeExecutor
is requested to generateInsertsOnlyOutputDF and generateInsertsOnlyOutputColsMergeOutputGeneration
is requested to generateCdcAndOutputRows
CDF-Aware Output Generation
It appears that ClassicMergeExecutor
and InsertOnlyMergeExecutor
use _row_dropped_
column to include the rows with false
value in output dataframes followed by dropping the column immediately.
An "exception" to the behaviour is MergeOutputGeneration for CDF-aware output generation.
Executing Command¶
RunnableCommand
run(
spark: SparkSession): Seq[Row]
run
is part of the RunnableCommand
(Spark SQL) abstraction.
run
is a transactional operation that is made up of the following steps:
- The metrics are reset
- Check if the source table should be materialized
- Runs this merge when no materialization or runWithMaterializedSourceLostRetries
Reset Metrics¶
The following metrics are reset (set to 0
):
- time taken to execute the entire operation
- time taken to scan the files for matches
- time taken to rewrite the matched files
isInsertOnly¶
isInsertOnly: Boolean
isInsertOnly
is positive for an insert-only merge (i.e. when there are WHEN NOT MATCHED clauses only in this MERGE command).
In other words, isInsertOnly
is enabled (true
) when all the following hold:
- There are neither WHEN MATCHED nor WHEN NOT MATCHED BY SOURCE clauses
- There are WHEN NOT MATCHED clauses
isInsertOnly
is used when:
MergeIntoCommand
is requested to run a merge (and prepareSourceDFAndReturnMaterializeReason)MergeIntoCommandBase
is requested to run (to shouldMaterializeSource)
Matched-Only Merge¶
isMatchedOnly: Boolean
isMatchedOnly
is enabled (true
) when this merge is WHEN MATCHED-only:
- There is at least one WHEN MATCHED clause
- There are no other clause types (neither WHEN NOT MATCHED nor WHEN NOT MATCHED BY SOURCE)
isMatchedOnly
is used when:
MergeIntoCommandBase
is requested to shouldOptimizeMatchedOnlyMerge and getTargetOnlyPredicatesClassicMergeExecutor
is requested to find files to rewrite
shouldOptimizeMatchedOnlyMerge¶
shouldOptimizeMatchedOnlyMerge(
spark: SparkSession): Boolean
shouldOptimizeMatchedOnlyMerge
is enabled (true
) when the following all hold:
- This merge is matched-only
- merge.optimizeMatchedOnlyMerge.enabled is enabled
shouldOptimizeMatchedOnlyMerge
is used when:
ClassicMergeExecutor
is requested to write out merge changes
Finding Target-Only Predicates (Among Merge and Clause Conditions)¶
getTargetOnlyPredicates(
spark: SparkSession): Seq[Expression]
getTargetOnlyPredicates
determines the target-only predicates in the merge condition first (targetOnlyPredicatesOnCondition
). getTargetOnlyPredicates
splits conjunctive predicates (And
s) in the merge condition and leaves only the ones with the column references to the columns in the target table only.
getTargetOnlyPredicates
branches off based on whether this merge is matched-only or not.
For a non-matched-only merge, getTargetOnlyPredicates
returns the target-only condition predicates.
Otherwise, getTargetOnlyPredicates
does the same (what it has just done with the merge condition) with conditional WHEN MATCHED clauses (the ones with a condition specified). getTargetOnlyPredicates
splits conjunctive predicates (And
s) in the conditions of the matchedClauses
and leaves only the ones with the column references to the target.
In the end, getTargetOnlyPredicates
returns the target-only condition predicates (from the merge condition and all the conditions from the conditional WHEN MATCHED clauses).
getTargetOnlyPredicates
is used when:
ClassicMergeExecutor
is requested to find files to rewrite
throwErrorOnMultipleMatches¶
throwErrorOnMultipleMatches(
hasMultipleMatches: Boolean,
spark: SparkSession): Unit
Procedure
throwErrorOnMultipleMatches
is a procedure (returns Unit
) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
throwErrorOnMultipleMatches
throws a DeltaUnsupportedOperationException
when the given hasMultipleMatches
is enabled (true
) and isOnlyOneUnconditionalDelete is disabled (false
).
throwErrorOnMultipleMatches
is used when:
ClassicMergeExecutor
is requested to findTouchedFiles
isOnlyOneUnconditionalDelete¶
isOnlyOneUnconditionalDelete: Boolean
isOnlyOneUnconditionalDelete
is positive (true
) when there is only one when matched clause that is a DELETE
with no condition.
In other words, isOnlyOneUnconditionalDelete
is true
for the following:
- matchedClauses is exactly a DeltaMergeIntoMatchedDeleteClause with no condition (hence the name unconditional delete)
Recording Merge Operation¶
recordMergeOperation[A](
extraOpType: String = "",
status: String = null,
sqlMetricName: String = null)(
thunk: => A): A
recordMergeOperation
creates a operation type (changedOpType
) based on the given extraOpType
identifier as follows:
delta.dml.merge.[extraOpType]
recordMergeOperation
appends the given status
to the existing Spark job description (as spark.job.description
local property), if any.
In the end, recordMergeOperation
executes the given thunk
code block:
- Records the start time
- Sets a human readable description of the current job (Spark Core) as the prefixed
status
- Adds the time taken to the given
sqlMetricName
- Restores the job description to the previous one, if any
MergeOutputGeneration | extraOpType | status | sqlMetricName |
---|---|---|---|
ClassicMergeExecutor | findTouchedFiles | MERGE operation - scanning files for matches | scanTimeMs |
ClassicMergeExecutor | writeAllUpdatesAndDeletes or writeAllChanges | MERGE operation - Rewriting n files | rewriteTimeMs |
InsertOnlyMergeExecutor | writeInsertsOnlyWhenNoMatchedClauses or writeInsertsOnlyWhenNoMatches | MERGE operation - writing new files for only inserts | rewriteTimeMs |
recordMergeOperation
is used when:
ClassicMergeExecutor
is requested to findTouchedFiles, writeAllChangesInsertOnlyMergeExecutor
is requested to writeOnlyInserts
Is WHEN NOT MATCHED THEN INSERT Clause Used¶
includesInserts: Boolean
includesInserts
is positive (true
) when there are WHEN NOT MATCHED clauses in this merge (with WHEN NOT MATCHED THEN INSERTs possible and hence the name of this method).
includesInserts
is used when:
ClassicMergeExecutor
is requested to findTouchedFiles (to create a DeduplicateCDFDeletes)InsertOnlyMergeExecutor
is requested to writeOnlyInserts (and stop early)
Auto Schema Merging¶
ImplicitMetadataOperation
canMergeSchema: Boolean
canMergeSchema
is part of the ImplicitMetadataOperation abstraction.
canMergeSchema
creates a DeltaOptions for the canMergeSchema (based on the SQLConf only).
Options always empty
Delta options cannot be passed to MERGE INTO, so they will always be empty and canMergeSchema
relies on the SQLConf only.
Create IncrementMetric Expression¶
incrementMetricAndReturnBool(
name: String,
valueToReturn: Boolean): Expression
incrementMetricAndReturnBool
creates a IncrementMetric
unary expression with the following:
IncrementMetric | Value |
---|---|
Child Expression | A Literal with the given valueToReturn (when executed) |
SQLMetric | The metric by the given name |
IncrementMetric
The IncrementMetric
presents itself under increment_metric
name in query plans.
When executed, IncrementMetric
increments the metric (by the given name
) and returns the given valueToReturn
.
Usage | Metric Name | valueToReturn |
---|---|---|
ClassicMergeExecutor to find files to rewrite | numSourceRows | true |
ClassicMergeExecutor to write out merge changes | numSourceRowsInSecondScan | true |
numTargetRowsCopied | false | |
InsertOnlyMergeExecutor to write out inserts | numSourceRows or numSourceRowsInSecondScan | true |
InsertOnlyMergeExecutor to generateInsertsOnlyOutputCols | numTargetRowsInserted | false |
MergeOutputGeneration to generateAllActionExprs | numTargetRowsUpdated | false |
numTargetRowsDeleted | true | |
numTargetRowsInserted | false |
Writing Data(Frame) Out to Delta Table¶
writeFiles(
spark: SparkSession,
txn: OptimisticTransaction,
outputDF: DataFrame): Seq[FileAction]
Fun Fact: The Name
The writeFiles
name stems from the (fun?) fact that writing out to a delta table is actually creating new files.
You can also look at the given DataFrame
that is usually another delta table that is nothing but a collection of AddFiles.
I found it a little funny, et toi? 😉
If the target table is partitioned and merge.repartitionBeforeWrite.enabled is enabled, writeFiles
repartitions the given outputDF
dataframe (using Dataset.repartition
operator) before writing it out.
In the end, writeFiles
requests the given OptimisticTransaction to write the data(frame) out.
writeFiles
is used when MergeIntoCommand
is requested to run a merge and does one of the following:
ClassicMergeExecutor
is requested to write out merge changesInsertOnlyMergeExecutor
is requested to write out inserts
isCdcEnabled¶
isCdcEnabled(
deltaTxn: OptimisticTransaction): Boolean
isCdcEnabled
is the value of the enableChangeDataFeed table property (from the metadata of a delta table).
isCdcEnabled
is used when:
ClassicMergeExecutor
is requested to findTouchedFiles, writeAllChanges
shouldWritePersistentDeletionVectors¶
shouldWritePersistentDeletionVectors(
spark: SparkSession,
txn: OptimisticTransaction): Boolean
shouldWritePersistentDeletionVectors
is enabled (true
) when the following all hold:
- spark.databricks.delta.merge.deletionVectors.persistent configuration property is enabled (
true
) - Protocol and table configuration support deletion vectors feature
shouldWritePersistentDeletionVectors
is used when:
MergeIntoCommand
is requested to run a merge