MergeIntoCommandBase¶
MergeIntoCommandBase is a DeltaCommand and an extension of the LeafRunnableCommand (Spark SQL) abstraction for merge delta commands.
MergeIntoCommandBase is a MergeIntoMaterializeSource.
Contract (Subset)¶
Merge Condition¶
condition: Expression
The "join" condition of this merge
See:
Used when:
MergeIntoCommandBaseis requested to collectMergeStats and getTargetOnlyPredicatesClassicMergeExecutoris requested to findTouchedFiles and writeAllChangesInsertOnlyMergeExecutoris requested to writeOnlyInserts (withfilterMatchedRowsenabled to trim the source table)
WHEN MATCHED Clauses¶
matchedClauses: Seq[DeltaMergeIntoMatchedClause]
See:
Used when:
MergeIntoCommandBaseis requested to isMatchedOnly, isInsertOnly, collectMergeStats, isOnlyOneUnconditionalDelete, getTargetOnlyPredicatesClassicMergeExecutoris requested to findTouchedFiles, writeAllChanges
WHEN NOT MATCHED Clauses¶
notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause]
DeltaMergeIntoNotMatchedClauses
See:
Used when:
InsertOnlyMergeExecutoris requested to writeOnlyInsertsMergeIntoCommandBaseis requested to isMatchedOnly, isInsertOnly, includesInserts, collectMergeStatsClassicMergeExecutoris requested to writeAllChangesInsertOnlyMergeExecutoris requested to writeOnlyInserts, generateInsertsOnlyOutputDF, generateOneInsertOutputCols
WHEN NOT MATCHED BY SOURCE Clauses¶
notMatchedBySourceClauses: Seq[DeltaMergeIntoNotMatchedBySourceClause]
DeltaMergeIntoNotMatchedBySourceClauses
See:
Used when:
MergeIntoCommandBaseis requested to isMatchedOnly, isInsertOnly, collectMergeStatsClassicMergeExecutoris requested to findTouchedFiles, writeAllChanges
Running Merge¶
runMerge(
spark: SparkSession): Seq[Row]
See:
Used when:
MergeIntoCommandBaseis requested to run
Source Data¶
source: LogicalPlan
A LogicalPlan (Spark SQL) of the source data to merge from (internally referred to as source)
See:
Used when:
MergeIntoCommandBaseis requested to run
checkIdentityColumnHighWaterMarks¶
checkIdentityColumnHighWaterMarks(
deltaTxn: OptimisticTransaction): Unit
checkIdentityColumnHighWaterMarks...FIXME
Implementations¶
Performance Metrics¶
| Name | web UI |
|---|---|
numSourceRows | number of source rows |
| numSourceRowsInSecondScan | number of source rows (during repeated scan) |
numTargetRowsCopied | number of target rows rewritten unmodified |
| numTargetRowsInserted | number of inserted rows |
| numTargetRowsUpdated | number of updated rows |
| numTargetRowsDeleted | number of deleted rows |
numTargetFilesBeforeSkipping | number of target files before skipping |
numTargetFilesAfterSkipping | number of target files after skipping |
numTargetFilesRemoved | number of files removed to target |
numTargetFilesAdded | number of files added to target |
numTargetChangeFilesAdded | number of change data capture files generated |
numTargetChangeFileBytes | total size of change data capture files generated |
numTargetBytesBeforeSkipping | number of target bytes before skipping |
numTargetBytesAfterSkipping | number of target bytes after skipping |
numTargetBytesRemoved | number of target bytes removed |
numTargetBytesAdded | number of target bytes added |
numTargetPartitionsAfterSkipping | number of target partitions after skipping |
numTargetPartitionsRemovedFrom | number of target partitions from which files were removed |
numTargetPartitionsAddedTo | number of target partitions to which files were added |
| numTargetRowsMatchedDeleted | number of rows deleted by a matched clause |
| numTargetRowsNotMatchedBySourceDeleted | number of rows deleted by a not matched by source clause |
| numTargetRowsMatchedUpdated | number of rows updated by a matched clause |
| numTargetRowsNotMatchedBySourceUpdated | number of rows updated by a not matched by source clause |
executionTimeMs | time taken to execute the entire operation |
| scanTimeMs | time taken to scan the files for matches |
| rewriteTimeMs | time taken to rewrite the matched files |
number of deleted rows¶
number of inserted rows¶
number of rows deleted by a matched clause¶
number of rows deleted by a not matched by source clause¶
number of rows updated by a matched clause¶
number of rows updated by a not matched by source clause¶
number of source rows (during repeated scan)¶
number of target rows rewritten unmodified¶
numTargetRowsCopied performance metric (like the other metrics) is turned into a non-deterministic user-defined function (UDF).
numTargetRowsCopied becomes incrNoopCountExpr UDF.
incrNoopCountExpr UDF is resolved on a joined plan and used to create a JoinedRowProcessor for processing partitions of the joined plan Dataset.
number of updated rows¶
time taken to execute the entire operation¶
time taken to rewrite the matched files¶
time taken to scan the files for matches¶
Building Target (Logical) Plan Spanned Over Fewer Files¶
buildTargetPlanWithFiles(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
files: Seq[AddFile],
columnsToDrop: Seq[String]): LogicalPlan
columnsToDrop Argument
columnsToDrop is always empty (Nil) but for ClassicMergeExecutor to findTouchedFiles.
buildTargetPlanWithFiles creates a TahoeBatchFileIndex for the given AddFiles (files) only.
Note
buildTargetPlanWithFiles creates a LogicalPlan of a delta table that is possibly smaller parquet data files (spanning over a smaller number of files) than the "source".
In the end, buildTargetPlanWithFiles buildTargetPlanWithIndex for the TahoeBatchFileIndex and the given columnsToDrop column names.
buildTargetPlanWithFiles is used when:
ClassicMergeExecutoris requested to findTouchedFiles and writeAllChangesInsertOnlyMergeExecutoris requested to writeOnlyInserts
buildTargetPlanWithIndex¶
buildTargetPlanWithIndex(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
fileIndex: TahoeFileIndex,
columnsToDrop: Seq[String]): LogicalPlan
buildTargetPlanWithIndex gets the references to columns in the target dataframe (AttributeReferences) and nulls for new columns that are added to the target table (as part of this OptimisticTransaction).
In the end, buildTargetPlanWithIndex creates a Project logical operator with new columns (with nulls) after the existing ones.
Special Columns¶
_row_dropped_¶
_row_dropped_ column name is used when:
ClassicMergeExecutoris requested to writeAllChangesInsertOnlyMergeExecutoris requested to generateInsertsOnlyOutputDF and generateInsertsOnlyOutputColsMergeOutputGenerationis requested to generateCdcAndOutputRows
CDF-Aware Output Generation
It appears that ClassicMergeExecutor and InsertOnlyMergeExecutor use _row_dropped_ column to include the rows with false value in output dataframes followed by dropping the column immediately.
An "exception" to the behaviour is MergeOutputGeneration for CDF-aware output generation.
Executing Command¶
RunnableCommand
run(
spark: SparkSession): Seq[Row]
run is part of the RunnableCommand (Spark SQL) abstraction.
run is a transactional operation that is made up of the following steps:
- The metrics are reset
- Check if the source table should be materialized
- Runs this merge when no materialization or runWithMaterializedSourceLostRetries
Reset Metrics¶
The following metrics are reset (set to 0):
- time taken to execute the entire operation
- time taken to scan the files for matches
- time taken to rewrite the matched files
isInsertOnly¶
isInsertOnly: Boolean
isInsertOnly is positive for an insert-only merge (i.e. when there are WHEN NOT MATCHED clauses only in this MERGE command).
In other words, isInsertOnly is enabled (true) when all the following hold:
- There are neither WHEN MATCHED nor WHEN NOT MATCHED BY SOURCE clauses
- There are WHEN NOT MATCHED clauses
isInsertOnly is used when:
MergeIntoCommandis requested to run a merge (and prepareSourceDFAndReturnMaterializeReason)MergeIntoCommandBaseis requested to run (to shouldMaterializeSource)
Matched-Only Merge¶
isMatchedOnly: Boolean
isMatchedOnly is enabled (true) when this merge is WHEN MATCHED-only:
- There is at least one WHEN MATCHED clause
- There are no other clause types (neither WHEN NOT MATCHED nor WHEN NOT MATCHED BY SOURCE)
isMatchedOnly is used when:
MergeIntoCommandBaseis requested to shouldOptimizeMatchedOnlyMerge and getTargetOnlyPredicatesClassicMergeExecutoris requested to find files to rewrite
shouldOptimizeMatchedOnlyMerge¶
shouldOptimizeMatchedOnlyMerge(
spark: SparkSession): Boolean
shouldOptimizeMatchedOnlyMerge is enabled (true) when the following all hold:
- This merge is matched-only
- merge.optimizeMatchedOnlyMerge.enabled is enabled
shouldOptimizeMatchedOnlyMerge is used when:
ClassicMergeExecutoris requested to write out merge changes
Finding Target-Only Predicates (Among Merge and Clause Conditions)¶
getTargetOnlyPredicates(
spark: SparkSession): Seq[Expression]
getTargetOnlyPredicates determines the target-only predicates in the merge condition first (targetOnlyPredicatesOnCondition). getTargetOnlyPredicates splits conjunctive predicates (Ands) in the merge condition and leaves only the ones with the column references to the columns in the target table only.
getTargetOnlyPredicates branches off based on whether this merge is matched-only or not.
For a non-matched-only merge, getTargetOnlyPredicates returns the target-only condition predicates.
Otherwise, getTargetOnlyPredicates does the same (what it has just done with the merge condition) with conditional WHEN MATCHED clauses (the ones with a condition specified). getTargetOnlyPredicates splits conjunctive predicates (Ands) in the conditions of the matchedClauses and leaves only the ones with the column references to the target.
In the end, getTargetOnlyPredicates returns the target-only condition predicates (from the merge condition and all the conditions from the conditional WHEN MATCHED clauses).
getTargetOnlyPredicates is used when:
ClassicMergeExecutoris requested to find files to rewrite
throwErrorOnMultipleMatches¶
throwErrorOnMultipleMatches(
hasMultipleMatches: Boolean,
spark: SparkSession): Unit
Procedure
throwErrorOnMultipleMatches is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
throwErrorOnMultipleMatches throws a DeltaUnsupportedOperationException when the given hasMultipleMatches is enabled (true) and isOnlyOneUnconditionalDelete is disabled (false).
throwErrorOnMultipleMatches is used when:
ClassicMergeExecutoris requested to findTouchedFiles
isOnlyOneUnconditionalDelete¶
isOnlyOneUnconditionalDelete: Boolean
isOnlyOneUnconditionalDelete is positive (true) when there is only one when matched clause that is a DELETE with no condition.
In other words, isOnlyOneUnconditionalDelete is true for the following:
- matchedClauses is exactly a DeltaMergeIntoMatchedDeleteClause with no condition (hence the name unconditional delete)
Recording Merge Operation¶
recordMergeOperation[A](
extraOpType: String = "",
status: String = null,
sqlMetricName: String = null)(
thunk: => A): A
recordMergeOperation creates a operation type (changedOpType) based on the given extraOpType identifier as follows:
delta.dml.merge.[extraOpType]
recordMergeOperation appends the given status to the existing Spark job description (as spark.job.description local property), if any.
In the end, recordMergeOperation executes the given thunk code block:
- Records the start time
- Sets a human readable description of the current job (Spark Core) as the prefixed
status - Adds the time taken to the given
sqlMetricName - Restores the job description to the previous one, if any
| MergeOutputGeneration | extraOpType | status | sqlMetricName |
|---|---|---|---|
| ClassicMergeExecutor | findTouchedFiles | MERGE operation - scanning files for matches | scanTimeMs |
| ClassicMergeExecutor | writeAllUpdatesAndDeletes or writeAllChanges | MERGE operation - Rewriting n files | rewriteTimeMs |
| InsertOnlyMergeExecutor | writeInsertsOnlyWhenNoMatchedClauses or writeInsertsOnlyWhenNoMatches | MERGE operation - writing new files for only inserts | rewriteTimeMs |
recordMergeOperation is used when:
ClassicMergeExecutoris requested to findTouchedFiles, writeAllChangesInsertOnlyMergeExecutoris requested to writeOnlyInserts
Is WHEN NOT MATCHED THEN INSERT Clause Used¶
includesInserts: Boolean
includesInserts is positive (true) when there are WHEN NOT MATCHED clauses in this merge (with WHEN NOT MATCHED THEN INSERTs possible and hence the name of this method).
includesInserts is used when:
ClassicMergeExecutoris requested to findTouchedFiles (to create a DeduplicateCDFDeletes)InsertOnlyMergeExecutoris requested to writeOnlyInserts (and stop early)
Auto Schema Merging¶
ImplicitMetadataOperation
canMergeSchema: Boolean
canMergeSchema is part of the ImplicitMetadataOperation abstraction.
canMergeSchema creates a DeltaOptions for the canMergeSchema (based on the SQLConf only).
Options always empty
Delta options cannot be passed to MERGE INTO, so they will always be empty and canMergeSchema relies on the SQLConf only.
Create IncrementMetric Expression¶
incrementMetricAndReturnBool(
name: String,
valueToReturn: Boolean): Expression
incrementMetricAndReturnBool creates a IncrementMetric unary expression with the following:
| IncrementMetric | Value |
|---|---|
Child Expression | A Literal with the given valueToReturn (when executed) |
SQLMetric | The metric by the given name |
IncrementMetric
The IncrementMetric presents itself under increment_metric name in query plans.
When executed, IncrementMetric increments the metric (by the given name) and returns the given valueToReturn.
| Usage | Metric Name | valueToReturn |
|---|---|---|
ClassicMergeExecutor to find files to rewrite | numSourceRows | true |
ClassicMergeExecutor to write out merge changes | numSourceRowsInSecondScan | true |
| numTargetRowsCopied | false | |
InsertOnlyMergeExecutor to write out inserts | numSourceRows or numSourceRowsInSecondScan | true |
InsertOnlyMergeExecutor to generateInsertsOnlyOutputCols | numTargetRowsInserted | false |
MergeOutputGeneration to generateAllActionExprs | numTargetRowsUpdated | false |
| numTargetRowsDeleted | true | |
| numTargetRowsInserted | false |
Write Data(Frame) Out to Delta Table¶
writeFiles(
spark: SparkSession,
txn: OptimisticTransaction,
outputDF: DataFrame): Seq[FileAction]
Fun Fact: The Name
The writeFiles name stems from the (fun?) fact that writing out to a delta table is actually creating new files.
You can also look at the given DataFrame that is usually another delta table that is nothing but a collection of AddFiles.
I found it a little funny, et toi? 😉
If the target table is partitioned and merge.repartitionBeforeWrite.enabled is enabled, writeFiles repartitions the given outputDF dataframe (using Dataset.repartition operator) before writing it out.
In the end, writeFiles requests the given OptimisticTransaction to write the data(frame) out.
writeFiles is used when MergeIntoCommand is requested to run a merge and does one of the following:
ClassicMergeExecutoris requested to write out merge changesInsertOnlyMergeExecutoris requested to write out inserts
isCdcEnabled¶
isCdcEnabled(
deltaTxn: OptimisticTransaction): Boolean
isCdcEnabled is the value of the enableChangeDataFeed table property (from the metadata of a delta table).
isCdcEnabled is used when:
ClassicMergeExecutoris requested to findTouchedFiles, writeAllChanges
shouldWritePersistentDeletionVectors¶
shouldWritePersistentDeletionVectors(
spark: SparkSession,
txn: OptimisticTransaction): Boolean
shouldWritePersistentDeletionVectors is enabled (true) when the following all hold:
- spark.databricks.delta.merge.deletionVectors.persistent configuration property is enabled (
true) - Protocol and table configuration support deletion vectors feature
shouldWritePersistentDeletionVectors is used when:
MergeIntoCommandis requested to run a merge