MergeIntoCommand¶
MergeIntoCommand is a DeltaCommand (indirectly as a MergeIntoCommandBase) that represents a DeltaMergeInto logical command at execution.
MergeIntoCommand is transactional (and starts a new transaction when executed).
MergeIntoCommand can optimize output generation (ClassicMergeExecutor or InsertOnlyMergeExecutor).
Creating Instance¶
MergeIntoCommand takes the following to be created:
- Source Table
- Target table (LogicalPlan)
- TahoeFileIndex
- Merge Condition (Expression)
- WHEN MATCHED Clauses
- WHEN NOT MATCHED Clauses
- WHEN NOT MATCHED BY SOURCE Clauses
- Migrated Schema
MergeIntoCommand is created when:
- PreprocessTableMerge logical resolution rule is executed (to resolve a DeltaMergeInto logical command)
Migrated Schema¶
migratedSchema: Option[StructType]
MergeIntoCommand can be given a migratedSchema (Spark SQL).
Output Attributes¶
output is a fixed-length collection of the following AttributeReferences:
| Name | Type |
|---|---|
num_affected_rows | LongType |
num_updated_rows | LongType |
num_deleted_rows | LongType |
num_inserted_rows | LongType |
Running Merge¶
MergeIntoCommandBase
runMerge(
spark: SparkSession): Seq[Row]
runMerge is part of the MergeIntoCommandBase abstraction.
runMerge records the start time.
runMerge starts a new transaction (on the targetDeltaLog).
If hasBeenExecuted, runMerge announces the updates of the metrics and quits early (returns no Rows).
FIXME When would hasBeenExecuted happen?
DeltaAnalysisException
In case the schema of the target table changed (compared to the time the transaction started), runMerge throws a DeltaAnalysisException.
The schema of a delta table is in the Metadata of the OptimisticTransactionImpl.
With Auto Schema Merging enabled (that boils down to schema.autoMerge.enabled), runMerge updates the metadata.
runMerge prepareSourceDFAndReturnMaterializeReason.
At this stage, runMerge is finally ready to apply all the necessary changes to the delta table (execute this merge) that result in a collection of FileActions (deltaActions).
runMerge writes out inserts or all changes based on the following:
- Whether this merge is insert-only and merge.optimizeInsertOnlyMerge.enabled is enabled
- Whether there are any files to rewrite
runMerge and MergeOutputGenerations
runMerge uses InsertOnlyMergeExecutor or ClassicMergeExecutor output generators.
runMerge collects the merge statistics.
runMerge requests the CacheManager (Spark SQL) to re-cache all the cached logical plans that refer to the target logical plan (since it has just changed).
runMerge announces the updates of the metrics.
In the end, runMerge returns the following performance metrics (as a single Row with the output):
| Column Name | Metric |
|---|---|
num_affected_rows | Total of the values of the metrics: |
num_updated_rows | number of updated rows |
num_deleted_rows | number of deleted rows |
num_inserted_rows | number of inserted rows |
FIXME Review the sections
Begin Transaction¶
run starts a new transaction (on the target delta table).
schema.autoMerge.enabled¶
Only when spark.databricks.delta.schema.autoMerge.enabled configuration property is enabled, run updates the metadata (of the transaction) with the following:
- migratedSchema (if defined) or the schema of the target
isOverwriteModeflag offrearrangeOnlyflag off
FileActions¶
run determines FileActions.
Single Insert-Only Merge¶
For a single insert-only merge with spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled configuration property enabled, run writeInsertsOnlyWhenNoMatchedClauses.
Other Merges¶
Otherwise, run finds the files to rewrite (i.e., AddFiles with the rows that satisfy the merge condition) and uses them to write out merge changes.
The AddFiles are converted into RemoveFiles.
run gives the RemoveFiles and the written-out FileActions.
Register Metrics¶
run registers the SQL metrics (with the current transaction).
Commit Transaction¶
run commits the current transaction (with the FileActions and MERGE operation).
Re-Cache Target Delta Table¶
run requests the CacheManager to re-cache the target plan.
Post Metric Updates¶
In the end, run posts the SQL metric updates (as a SparkListenerDriverAccumUpdates (Apache Spark) Spark event) to SparkListeners (incl. Spark UI).
Note
Use SparkListener (Apache Spark) to intercept SparkListenerDriverAccumUpdates events.
commitAndRecordStats¶
commitAndRecordStats(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
mergeActions: Seq[FileAction],
startTime: Long,
materializeSourceReason: MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason): Unit
commitAndRecordStats...FIXME
Logging¶
Enable ALL logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.MergeIntoCommand.name = org.apache.spark.sql.delta.commands.MergeIntoCommand
logger.MergeIntoCommand.level = all
Refer to Logging.