Skip to content

InsertOnlyMergeExecutor

InsertOnlyMergeExecutor is an extension of the MergeOutputGeneration abstraction for optimized execution of MERGE command (when requested to run a merge) that only inserts new data.

InsertOnlyMergeExecutor is used only when merge.optimizeInsertOnlyMerge.enabled is enabled.

ClassicMergeExecutor

When a MERGE query is neither insert only nor spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled is enabled, ClassicMergeExecutor is used to run merge.

Writing Out Inserts

writeOnlyInserts(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction,
  filterMatchedRows: Boolean,
  numSourceRowsMetric: String): Seq[FileAction]

filterMatchedRows Argument

writeOnlyInserts is given filterMatchedRows flag when running a merge for the following conditions:

filterMatchedRows Flag Condition
true An insert-only merge and merge.optimizeInsertOnlyMerge.enabled enabled
false No files to rewrite (AddFiles) from findTouchedFiles

writeOnlyInserts records this merge operation with the following:

Property Value
extraOpType
  • writeInsertsOnlyWhenNoMatchedClauses with the given filterMatchedRows enabled (see the above note 👆)
  • writeInsertsOnlyWhenNoMatches otherwise
status MERGE operation - writing new files for only inserts
sqlMetricName rewriteTimeMs
Early Stop Possible

With no WHEN NOT MATCHED THEN INSERT clauses used and the given filterMatchedRows flag disabled (see the above note 👆), writeOnlyInserts has nothing to do (nothing to insert and so no new files to write).

The numSourceRowsInSecondScan metric is set to -1.

writeOnlyInserts returns no FileActions.

writeOnlyInserts creates an Expression to increment the metric (by the given numSourceRowsMetric name and to return true literal) that is used to count the number of rows in the source dataframe.

What a trick!

writeOnlyInserts creates a custom Catalyst Expression that is a predicate (returns true value) that can and is used in Dataset.filter operator.

At execution time, the filter operator requests the custom expression to do its work (i.e., update the metric and return true) so, in the end, accepts all the rows and (as a side effect) counts the number of rows. What a clever trick!

For a merge with just a single WHEN NOT MATCHED THEN INSERT clause with a condition (conditional insert), writeOnlyInserts adds Dataset.filter with the condition to the source dataframe.

filterMatchedRows builds a preparedSourceDF DataFrame.

For an insert-only merge (and merge.optimizeInsertOnlyMerge.enabled enabled) (when the given filterMatchedRows flag is enabled), writeOnlyInserts uses LEFT ANTI join on the source to find the rows to insert:

  1. filterMatchedRows splits conjunctive predicates in the condition to find target-only ones (that reference the target columns only)
  2. filterMatchedRows requests the given OptimisticTransaction for the files matching the target-only predicates
  3. filterMatchedRows creates a targetDF dataframe for the target plan with the files
  4. In the end, filterMatchedRows performs a LEFT ANTI join on the sourceDF and targetDF dataframes with the condition as the join condition

With filterMatchedRows disabled, writeOnlyInserts leaves the sourceDF unchanged.

writeOnlyInserts generateInsertsOnlyOutputDF with the preparedSourceDF (that creates an outputDF dataframe).

writeOnlyInserts prints out the following DEBUG message to the logs:

[extraOpType]: output plan:
[outputDF]

writeOnlyInserts writeFiles with the outputDF (that gives FileActions).

In the end, writeOnlyInserts updates the performance metrics.


writeOnlyInserts is used when:

Generating Output DataFrame for Insert-Only Merge

generateInsertsOnlyOutputDF(
  preparedSourceDF: DataFrame,
  deltaTxn: OptimisticTransaction): DataFrame

generateInsertsOnlyOutputDF generates the final output dataframe to be written out to a target delta table.

generateInsertsOnlyOutputDF (as part of InsertOnlyMergeExecutor) is used for insert-only merges and makes distinction between single vs many WHEN NOT MATCHED THEN INSERT merges.


generateInsertsOnlyOutputDF get the columns names (targetOutputColNames) of the target table (from the metadata of the given OptimisticTransaction).

Optimization: Single WHEN NOT MATCHED THEN INSERT Merges

For just a single WHEN NOT MATCHED THEN INSERT merge, generateInsertsOnlyOutputDF generateOneInsertOutputCols (from the target output columns) to project the given preparedSourceDF on (using Dataset.select operator) and returns it. generateInsertsOnlyOutputDF finishes early.

generateInsertsOnlyOutputDF appends precomputed clause conditions to the given preparedSourceDF dataframe (with the WHEN NOT MATCHED clauses).

Note

At this point, we know there are more than one WHEN NOT MATCHED clauses in this merge.

generateInsertsOnlyOutputDF generates the output columns for this insert-only merge (based on the target output columns and the precomputed DeltaMergeIntoNotMatchedInsertClauses).

In the end, generateInsertsOnlyOutputDF does "column and filter pruning" of the sourceWithPrecompConditions dataframe:

  • Leaves the outputCols columns only (using Dataset.select operator)
  • Leaves rows with _row_dropped_ column with false value only (using Dataset.filter operator)

generateInsertsOnlyOutputDF drops the _row_dropped_ column.

generateInsertsOnlyOutputCols

generateInsertsOnlyOutputCols(
  targetOutputColNames: Seq[String],
  insertClausesWithPrecompConditions: Seq[DeltaMergeIntoNotMatchedClause]): Seq[Column]

generateInsertsOnlyOutputDF uses the given targetOutputColNames column names with one extra row_dropped column (outputColNames).

For every insertClausesWithPrecompConditions clause, generateInsertsOnlyOutputDF creates a collection of the expressions of the DeltaMergeActions and one to increment numTargetRowsInserted metric (allInsertExprs).

generateInsertsOnlyOutputDF uses the given insertClausesWithPrecompConditions clauses to create allInsertConditions collection of the conditions (if specified) or assumes true.

generateInsertsOnlyOutputDF uses the allInsertConditions and allInsertExprs to generate a collection of CaseWhen expressions with an elseValue based on dropSourceRowExprs (outputExprs).

FIXME A few examples would make the description much easier

generateInsertsOnlyOutputDF prints out the following DEBUG message to the logs (with the generated CaseWhens):

prepareInsertsOnlyOutputDF: not matched expressions
    [outputExprs]

In the end, generateInsertsOnlyOutputDF takes the outputExprs and the outputColNames to create Columns.

Logging

InsertOnlyMergeExecutor is an abstract class and logging is configured using the logger of the MergeIntoCommand.