MergeIntoCommand¶
MergeIntoCommand
is a DeltaCommand (indirectly as a MergeIntoCommandBase) that represents a DeltaMergeInto logical command at execution.
MergeIntoCommand
is transactional (and starts a new transaction when executed).
MergeIntoCommand
can optimize output generation (ClassicMergeExecutor or InsertOnlyMergeExecutor).
Creating Instance¶
MergeIntoCommand
takes the following to be created:
- Source Table
- Target table (LogicalPlan)
- TahoeFileIndex
- Merge Condition (Expression)
- WHEN MATCHED Clauses
- WHEN NOT MATCHED Clauses
- WHEN NOT MATCHED BY SOURCE Clauses
- Migrated Schema
MergeIntoCommand
is created when:
- PreprocessTableMerge logical resolution rule is executed (to resolve a DeltaMergeInto logical command)
Migrated Schema¶
migratedSchema: Option[StructType]
MergeIntoCommand
can be given a migratedSchema
(Spark SQL).
Output Attributes¶
output
is a fixed-length collection of the following AttributeReference
s:
Name | Type |
---|---|
num_affected_rows | LongType |
num_updated_rows | LongType |
num_deleted_rows | LongType |
num_inserted_rows | LongType |
Running Merge¶
MergeIntoCommandBase
runMerge(
spark: SparkSession): Seq[Row]
runMerge
is part of the MergeIntoCommandBase abstraction.
runMerge
records the start time.
runMerge
starts a new transaction (on the targetDeltaLog).
If hasBeenExecuted, runMerge
announces the updates of the metrics and quits early (returns no Row
s).
FIXME When would hasBeenExecuted
happen?
DeltaAnalysisException
In case the schema of the target table changed (compared to the time the transaction started), runMerge
throws a DeltaAnalysisException
.
The schema of a delta table is in the Metadata of the OptimisticTransactionImpl.
With Auto Schema Merging enabled (that boils down to schema.autoMerge.enabled), runMerge
updates the metadata.
runMerge
prepareSourceDFAndReturnMaterializeReason.
At this stage, runMerge
is finally ready to apply all the necessary changes to the delta table (execute this merge) that result in a collection of FileActions (deltaActions
).
runMerge
writes out inserts or all changes based on the following:
- Whether this merge is insert-only and merge.optimizeInsertOnlyMerge.enabled is enabled
- Whether there are any files to rewrite
runMerge
and MergeOutputGeneration
s
runMerge
uses InsertOnlyMergeExecutor or ClassicMergeExecutor output generators.
runMerge
collects the merge statistics.
runMerge
requests the CacheManager
(Spark SQL) to re-cache all the cached logical plans that refer to the target logical plan (since it has just changed).
runMerge
announces the updates of the metrics.
In the end, runMerge
returns the following performance metrics (as a single Row
with the output):
Column Name | Metric |
---|---|
num_affected_rows | Total of the values of the metrics: |
num_updated_rows | number of updated rows |
num_deleted_rows | number of deleted rows |
num_inserted_rows | number of inserted rows |
FIXME Review the sections
Begin Transaction¶
run
starts a new transaction (on the target delta table).
schema.autoMerge.enabled¶
Only when spark.databricks.delta.schema.autoMerge.enabled configuration property is enabled, run
updates the metadata (of the transaction) with the following:
- migratedSchema (if defined) or the schema of the target
isOverwriteMode
flag offrearrangeOnly
flag off
FileActions¶
run
determines FileActions.
Single Insert-Only Merge¶
For a single insert-only merge with spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled configuration property enabled, run
writeInsertsOnlyWhenNoMatchedClauses.
Other Merges¶
Otherwise, run
finds the files to rewrite (i.e., AddFiles with the rows that satisfy the merge condition) and uses them to write out merge changes.
The AddFile
s are converted into RemoveFiles.
run
gives the RemoveFile
s and the written-out FileActions.
Register Metrics¶
run
registers the SQL metrics (with the current transaction).
Commit Transaction¶
run
commits the current transaction (with the FileActions and MERGE
operation).
Re-Cache Target Delta Table¶
run
requests the CacheManager
to re-cache the target plan.
Post Metric Updates¶
In the end, run
posts the SQL metric updates (as a SparkListenerDriverAccumUpdates
(Apache Spark) Spark event) to SparkListener
s (incl. Spark UI).
Note
Use SparkListener
(Apache Spark) to intercept SparkListenerDriverAccumUpdates
events.
commitAndRecordStats¶
commitAndRecordStats(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
mergeActions: Seq[FileAction],
startTime: Long,
materializeSourceReason: MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason): Unit
commitAndRecordStats
...FIXME
Logging¶
Enable ALL
logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.MergeIntoCommand.name = org.apache.spark.sql.delta.commands.MergeIntoCommand
logger.MergeIntoCommand.level = all
Refer to Logging.