Skip to content

MergeIntoCommand

MergeIntoCommand is a DeltaCommand (indirectly as a MergeIntoCommandBase) that represents a DeltaMergeInto logical command at execution.

MergeIntoCommand is transactional (and starts a new transaction when executed).

MergeIntoCommand can optimize output generation (ClassicMergeExecutor or InsertOnlyMergeExecutor).

Creating Instance

MergeIntoCommand takes the following to be created:

MergeIntoCommand is created when:

Migrated Schema

migratedSchema: Option[StructType]

MergeIntoCommand can be given a migratedSchema (Spark SQL).

Output Attributes

Command
output: Seq[Attribute]

output is part of the Command (Spark SQL) abstraction.

output is a fixed-length collection of the following AttributeReferences:

Name Type
num_affected_rows LongType
num_updated_rows LongType
num_deleted_rows LongType
num_inserted_rows LongType

Running Merge

MergeIntoCommandBase
runMerge(
  spark: SparkSession): Seq[Row]

runMerge is part of the MergeIntoCommandBase abstraction.

runMerge records the start time.

runMerge starts a new transaction (on the targetDeltaLog).

If hasBeenExecuted, runMerge announces the updates of the metrics and quits early (returns no Rows).

FIXME When would hasBeenExecuted happen?

DeltaAnalysisException

In case the schema of the target table changed (compared to the time the transaction started), runMerge throws a DeltaAnalysisException.


The schema of a delta table is in the Metadata of the OptimisticTransactionImpl.

With Auto Schema Merging enabled (that boils down to schema.autoMerge.enabled), runMerge updates the metadata.

runMerge prepareSourceDFAndReturnMaterializeReason.

At this stage, runMerge is finally ready to apply all the necessary changes to the delta table (execute this merge) that result in a collection of FileActions (deltaActions).

runMerge writes out inserts or all changes based on the following:

runMerge and MergeOutputGenerations

runMerge uses InsertOnlyMergeExecutor or ClassicMergeExecutor output generators.

runMerge collects the merge statistics.

runMerge requests the CacheManager (Spark SQL) to re-cache all the cached logical plans that refer to the target logical plan (since it has just changed).

runMerge announces the updates of the metrics.

In the end, runMerge returns the following performance metrics (as a single Row with the output):

Column Name Metric
num_affected_rows Total of the values of the metrics:
num_updated_rows number of updated rows
num_deleted_rows number of deleted rows
num_inserted_rows number of inserted rows

Logging

Enable ALL logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.MergeIntoCommand.name = org.apache.spark.sql.delta.commands.MergeIntoCommand
logger.MergeIntoCommand.level = all

Refer to Logging.