Skip to content

MergeIntoCommand

MergeIntoCommand is a DeltaCommand (indirectly as a MergeIntoCommandBase) that represents a DeltaMergeInto logical command at execution.

MergeIntoCommand is transactional (and starts a new transaction when executed).

MergeIntoCommand can optimize output generation (ClassicMergeExecutor or InsertOnlyMergeExecutor).

Creating Instance

MergeIntoCommand takes the following to be created:

MergeIntoCommand is created when:

Migrated Schema

migratedSchema: Option[StructType]

MergeIntoCommand can be given a migratedSchema (Spark SQL).

Output Attributes

Command
output: Seq[Attribute]

output is part of the Command (Spark SQL) abstraction.

output is a fixed-length collection of the following AttributeReferences:

Name Type
num_affected_rows LongType
num_updated_rows LongType
num_deleted_rows LongType
num_inserted_rows LongType

Running Merge

MergeIntoCommandBase
runMerge(
  spark: SparkSession): Seq[Row]

runMerge is part of the MergeIntoCommandBase abstraction.

runMerge records the start time.

runMerge starts a new transaction (on the targetDeltaLog).

If hasBeenExecuted, runMerge announces the updates of the metrics and quits early (returns no Rows).

FIXME When would hasBeenExecuted happen?

DeltaAnalysisException

In case the schema of the target table changed (compared to the time the transaction started), runMerge throws a DeltaAnalysisException.


The schema of a delta table is in the Metadata of the OptimisticTransactionImpl.

With Auto Schema Merging enabled (that boils down to schema.autoMerge.enabled), runMerge updates the metadata.

runMerge prepareSourceDFAndReturnMaterializeReason.

At this stage, runMerge is finally ready to apply all the necessary changes to the delta table (execute this merge) that result in a collection of FileActions (deltaActions).

runMerge writes out inserts or all changes based on the following:

runMerge and MergeOutputGenerations

runMerge uses InsertOnlyMergeExecutor or ClassicMergeExecutor output generators.

runMerge collects the merge statistics.

runMerge requests the CacheManager (Spark SQL) to re-cache all the cached logical plans that refer to the target logical plan (since it has just changed).

runMerge announces the updates of the metrics.

In the end, runMerge returns the following performance metrics (as a single Row with the output):

Column Name Metric
num_affected_rows Total of the values of the metrics:
num_updated_rows number of updated rows
num_deleted_rows number of deleted rows
num_inserted_rows number of inserted rows

FIXME Review the sections

  1. Begin Transaction
    1. schema.autoMerge.enabled
    2. FileActions
    3. Register Metrics
  2. Commit Transaction
  3. Re-Cache Target Delta Table
  4. Post Metric Updates

Begin Transaction

run starts a new transaction (on the target delta table).

schema.autoMerge.enabled

Only when spark.databricks.delta.schema.autoMerge.enabled configuration property is enabled, run updates the metadata (of the transaction) with the following:

  • migratedSchema (if defined) or the schema of the target
  • isOverwriteMode flag off
  • rearrangeOnly flag off

FileActions

run determines FileActions.

Single Insert-Only Merge

For a single insert-only merge with spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled configuration property enabled, run writeInsertsOnlyWhenNoMatchedClauses.

Other Merges

Otherwise, run finds the files to rewrite (i.e., AddFiles with the rows that satisfy the merge condition) and uses them to write out merge changes.

The AddFiles are converted into RemoveFiles.

run gives the RemoveFiles and the written-out FileActions.

Register Metrics

run registers the SQL metrics (with the current transaction).

Commit Transaction

run commits the current transaction (with the FileActions and MERGE operation).

Re-Cache Target Delta Table

run requests the CacheManager to re-cache the target plan.

Post Metric Updates

In the end, run posts the SQL metric updates (as a SparkListenerDriverAccumUpdates (Apache Spark) Spark event) to SparkListeners (incl. Spark UI).

Note

Use SparkListener (Apache Spark) to intercept SparkListenerDriverAccumUpdates events.

commitAndRecordStats

commitAndRecordStats(
  spark: SparkSession,
  deltaTxn: OptimisticTransaction,
  mergeActions: Seq[FileAction],
  startTime: Long,
  materializeSourceReason: MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason): Unit

commitAndRecordStats...FIXME

Logging

Enable ALL logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.MergeIntoCommand.name = org.apache.spark.sql.delta.commands.MergeIntoCommand
logger.MergeIntoCommand.level = all

Refer to Logging.