Skip to content

MergeIntoCommand

MergeIntoCommand is a DeltaCommand that represents a DeltaMergeInto logical command at execution time.

MergeIntoCommand is a logical command (Spark SQL's RunnableCommand).

Tip

Learn more on the internals of MergeIntoCommand in Demo: Merge Operation.

Performance Metrics

Name web UI
numSourceRows number of source rows
numTargetRowsCopied number of target rows rewritten unmodified
numTargetRowsInserted number of inserted rows
numTargetRowsUpdated number of updated rows
numTargetRowsDeleted number of deleted rows
numTargetFilesBeforeSkipping number of target files before skipping
numTargetFilesAfterSkipping number of target files after skipping
numTargetFilesRemoved number of files removed to target
numTargetFilesAdded number of files added to target

number of target rows rewritten unmodified

numTargetRowsCopied performance metric (like the other metrics) is turned into a non-deterministic user-defined function (UDF).

numTargetRowsCopied becomes incrNoopCountExpr UDF.

incrNoopCountExpr UDF is resolved on a joined plan and used to create a JoinedRowProcessor for processing partitions of the joined plan Dataset.

Creating Instance

MergeIntoCommand takes the following to be created:

  • Source Data
  • Target Data (LogicalPlan)
  • TahoeFileIndex
  • Condition Expression
  • Matched Clauses (Seq[DeltaMergeIntoMatchedClause])
  • Optional Non-Matched Clause (Option[DeltaMergeIntoInsertClause])
  • Migrated Schema

MergeIntoCommand is created when PreprocessTableMerge logical resolution rule is executed (on a DeltaMergeInto logical command).

Source Data to Merge From

When created, MergeIntoCommand is given a LogicalPlan for the source data to merge from.

The LogicalPlan is used twice:

Executing Command

run(
  spark: SparkSession): Seq[Row]

run requests the target DeltaLog to start a new transaction.

With spark.databricks.delta.schema.autoMerge.enabled configuration property enabled, run updates the metadata (of the transaction).

run determines Delta actions (RemoveFiles and AddFiles).

Describe deltaActions part

With spark.databricks.delta.history.metricsEnabled configuration property enabled, run requests the current transaction to register SQL metrics for the Delta operation.

run requests the current transaction to commit (with the Delta actions and Merge operation).

run records the Delta event.

run posts a SparkListenerDriverAccumUpdates Spark event (with the metrics).

In the end, run requests the CacheManager to recacheByPlan.

run is part of the RunnableCommand (Spark SQL) abstraction.

Exceptions

run throws an AnalysisException when the target schema is different than the delta table's (has changed after analysis phase):

The schema of your Delta table has changed in an incompatible way since your DataFrame or DeltaTable object was created. Please redefine your DataFrame or DeltaTable object. Changes:
[schemaDiff]
This check can be turned off by setting the session configuration key spark.databricks.delta.checkLatestSchemaOnRead to false.

writeAllChanges

writeAllChanges(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction,
  filesToRewrite: Seq[AddFile]): Seq[AddFile]

writeAllChanges builds the target output columns (possibly with some nulls for the target columns that are not in the current schema).

writeAllChanges builds a target logical query plan for the AddFiles.

writeAllChanges determines a join type to use (rightOuter or fullOuter).

writeAllChanges prints out the following DEBUG message to the logs:

writeAllChanges using [joinType] join:
source.output: [outputSet]
target.output: [outputSet]
condition: [condition]
newTarget.output: [outputSet]

writeAllChanges creates a joinedDF DataFrame that is a join of the DataFrames for the source and the new target logical plans with the given join condition and the join type.

writeAllChanges creates a JoinedRowProcessor that is then used to map over partitions of the joined DataFrame.

writeAllChanges prints out the following DEBUG message to the logs:

writeAllChanges: join output plan:
[outputDF.queryExecution]

writeAllChanges requests the input OptimisticTransaction to writeFiles (possibly repartitioning by the partition columns if table is partitioned and spark.databricks.delta.merge.repartitionBeforeWrite.enabled configuration property is enabled).

writeAllChanges is used when MergeIntoCommand is requested to run.

findTouchedFiles

findTouchedFiles(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction): Seq[AddFile]

findTouchedFiles registers an accumulator to collect all the distinct touched files.

Note

The name of the accumulator is internal.metrics.MergeIntoDelta.touchedFiles and internal.metrics part is supposed to hide it for web UI as potentially large.

findTouchedFiles defines a UDF that adds the file names to the accumulator.

findTouchedFiles does some magic with the condition to find expressions that use the target's columns. findTouchedFiles splits conjunctive predicates (And expressions) and collects the predicates that use the target's columns (targetOnlyPredicates). findTouchedFiles requests the given OptimisticTransaction for the files that match the predicates.

Note

This step looks similar to filter predicate pushdown. Please confirm.

findTouchedFiles...FIXME

Building Target Logical Query Plan for AddFiles

buildTargetPlanWithFiles(
  deltaTxn: OptimisticTransaction,
  files: Seq[AddFile]): LogicalPlan

buildTargetPlanWithFiles creates a DataFrame to represent the given AddFiles to access the analyzed logical query plan. buildTargetPlanWithFiles requests the given OptimisticTransaction for the DeltaLog to create a DataFrame (for the Snapshot and the given AddFiles).

In the end, buildTargetPlanWithFiles creates a Project logical operator with Alias expressions so the output columns of the analyzed logical query plan (of the DataFrame of the AddFiles) reference the target's output columns (by name).

Note

The output columns of the target delta table are associated with a OptimisticTransaction as the Metadata.

deltaTxn.metadata.schema

writeInsertsOnlyWhenNoMatchedClauses

writeInsertsOnlyWhenNoMatchedClauses(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction): Seq[AddFile]

writeInsertsOnlyWhenNoMatchedClauses...FIXME

Logging

Enable ALL logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.commands.MergeIntoCommand=ALL

Refer to Logging.


Last update: 2020-10-12