InsertOnlyMergeExecutor¶
InsertOnlyMergeExecutor
is an extension of the MergeOutputGeneration abstraction for optimized execution of MERGE command (when requested to run a merge) that only inserts new data.
InsertOnlyMergeExecutor
is used only when merge.optimizeInsertOnlyMerge.enabled is enabled.
ClassicMergeExecutor
When a MERGE query is neither insert only nor spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled is enabled, ClassicMergeExecutor is used to run merge.
Writing Out Inserts¶
writeOnlyInserts(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
filterMatchedRows: Boolean,
numSourceRowsMetric: String): Seq[FileAction]
filterMatchedRows
Argument
writeOnlyInserts
is given filterMatchedRows
flag when running a merge for the following conditions:
filterMatchedRows Flag | Condition |
---|---|
true | An insert-only merge and merge.optimizeInsertOnlyMerge.enabled enabled |
false | No files to rewrite (AddFiles) from findTouchedFiles |
writeOnlyInserts
records this merge operation with the following:
Property | Value |
---|---|
extraOpType |
|
status | MERGE operation - writing new files for only inserts |
sqlMetricName | rewriteTimeMs |
Early Stop Possible
With no WHEN NOT MATCHED THEN INSERT clauses used and the given filterMatchedRows
flag disabled (see the above note 👆), writeOnlyInserts
has nothing to do (nothing to insert and so no new files to write).
The numSourceRowsInSecondScan metric is set to -1
.
writeOnlyInserts
returns no FileActions.
writeOnlyInserts
creates an Expression to increment the metric (by the given numSourceRowsMetric
name and to return true
literal) that is used to count the number of rows in the source dataframe.
What a trick!
writeOnlyInserts
creates a custom Catalyst Expression that is a predicate (returns true
value) that can and is used in Dataset.filter
operator.
At execution time, the filter
operator requests the custom expression to do its work (i.e., update the metric and return true
) so, in the end, accepts all the rows and (as a side effect) counts the number of rows. What a clever trick!
For a merge with just a single WHEN NOT MATCHED THEN INSERT clause with a condition (conditional insert), writeOnlyInserts
adds Dataset.filter
with the condition to the source dataframe.
filterMatchedRows
builds a preparedSourceDF
DataFrame.
For an insert-only merge (and merge.optimizeInsertOnlyMerge.enabled enabled) (when the given filterMatchedRows
flag is enabled), writeOnlyInserts
uses LEFT ANTI join on the source to find the rows to insert:
filterMatchedRows
splits conjunctive predicates in the condition to find target-only ones (that reference the target columns only)filterMatchedRows
requests the given OptimisticTransaction for the files matching the target-only predicatesfilterMatchedRows
creates atargetDF
dataframe for the target plan with the files- In the end,
filterMatchedRows
performs a LEFT ANTI join on thesourceDF
andtargetDF
dataframes with the condition as the join condition
With filterMatchedRows
disabled, writeOnlyInserts
leaves the sourceDF
unchanged.
writeOnlyInserts
generateInsertsOnlyOutputDF with the preparedSourceDF
(that creates an outputDF
dataframe).
writeOnlyInserts
prints out the following DEBUG message to the logs:
[extraOpType]: output plan:
[outputDF]
writeOnlyInserts
writeFiles with the outputDF
(that gives FileActions).
In the end, writeOnlyInserts
updates the performance metrics.
writeOnlyInserts
is used when:
MergeIntoCommand
is requested to run a merge (for an insert-only merge with merge.optimizeInsertOnlyMerge.enabled or when there are no files to rewrite)
Generating Output DataFrame for Insert-Only Merge¶
generateInsertsOnlyOutputDF(
preparedSourceDF: DataFrame,
deltaTxn: OptimisticTransaction): DataFrame
generateInsertsOnlyOutputDF
generates the final output dataframe to be written out to a target delta table.
generateInsertsOnlyOutputDF
(as part of InsertOnlyMergeExecutor) is used for insert-only merges and makes distinction between single vs many WHEN NOT MATCHED THEN INSERT merges.
generateInsertsOnlyOutputDF
get the columns names (targetOutputColNames
) of the target table (from the metadata of the given OptimisticTransaction).
Optimization: Single WHEN NOT MATCHED THEN INSERT Merges
For just a single WHEN NOT MATCHED THEN INSERT merge, generateInsertsOnlyOutputDF
generateOneInsertOutputCols (from the target output columns) to project the given preparedSourceDF
on (using Dataset.select
operator) and returns it. generateInsertsOnlyOutputDF
finishes early.
generateInsertsOnlyOutputDF
appends precomputed clause conditions to the given preparedSourceDF
dataframe (with the WHEN NOT MATCHED clauses).
Note
At this point, we know there are more than one WHEN NOT MATCHED clauses in this merge.
generateInsertsOnlyOutputDF
generates the output columns for this insert-only merge (based on the target output columns and the precomputed DeltaMergeIntoNotMatchedInsertClauses).
In the end, generateInsertsOnlyOutputDF
does "column and filter pruning" of the sourceWithPrecompConditions
dataframe:
- Leaves the
outputCols
columns only (usingDataset.select
operator) - Leaves rows with _row_dropped_ column with
false
value only (usingDataset.filter
operator)
generateInsertsOnlyOutputDF
drops the _row_dropped_ column.
generateInsertsOnlyOutputCols¶
generateInsertsOnlyOutputCols(
targetOutputColNames: Seq[String],
insertClausesWithPrecompConditions: Seq[DeltaMergeIntoNotMatchedClause]): Seq[Column]
generateInsertsOnlyOutputDF
uses the given targetOutputColNames
column names with one extra row_dropped column (outputColNames
).
For every insertClausesWithPrecompConditions
clause, generateInsertsOnlyOutputDF
creates a collection of the expressions of the DeltaMergeActions and one to increment numTargetRowsInserted metric (allInsertExprs
).
generateInsertsOnlyOutputDF
uses the given insertClausesWithPrecompConditions
clauses to create allInsertConditions
collection of the conditions (if specified) or assumes true
.
generateInsertsOnlyOutputDF
uses the allInsertConditions
and allInsertExprs
to generate a collection of CaseWhen
expressions with an elseValue
based on dropSourceRowExprs
(outputExprs
).
FIXME A few examples would make the description much easier
generateInsertsOnlyOutputDF
prints out the following DEBUG message to the logs (with the generated CaseWhen
s):
prepareInsertsOnlyOutputDF: not matched expressions
[outputExprs]
In the end, generateInsertsOnlyOutputDF
takes the outputExprs
and the outputColNames
to create Column
s.
Logging¶
InsertOnlyMergeExecutor
is an abstract class and logging is configured using the logger of the MergeIntoCommand.