DeltaMergeInto Logical Command¶
DeltaMergeInto
is a logical Command
(Spark SQL).
Creating Instance¶
DeltaMergeInto
takes the following to be created:
- Target Table (Spark SQL)
- Source Table or Subquery (Spark SQL)
- Condition Expression
- DeltaMergeIntoMatchedClauses
- DeltaMergeIntoNotMatchedClauses
- DeltaMergeIntoNotMatchedBySourceClauses
- migrateSchema flag
- Final schema (
StructType
)
When created, DeltaMergeInto
verifies the actions in the matchedClauses and notMatchedClauses clauses.
DeltaMergeInto
is created (using apply and resolveReferences utilities) when:
DeltaMergeBuilder
is requested to execute- DeltaAnalysis logical resolution rule is executed
Logical Resolution¶
DeltaMergeInto
is resolved to MergeIntoCommand by PreprocessTableMerge logical resolution rule.
migrateSchema Flag¶
DeltaMergeInto
is given migrateSchema
flag when created:
- apply uses
false
always - resolveReferences is
true
only with the spark.databricks.delta.schema.autoMerge.enabled configuration property enabled and*
s only (in matched and not-matched clauses)
migrateSchema
is used when:
- PreprocessTableMerge logical resolution rule is executed
SupportsSubquery¶
DeltaMergeInto
is a SupportsSubquery
(Spark SQL)
Creating DeltaMergeInto¶
apply(
target: LogicalPlan,
source: LogicalPlan,
condition: Expression,
whenClauses: Seq[DeltaMergeIntoClause]): DeltaMergeInto
apply
...FIXME
apply
is used when:
DeltaMergeBuilder
is requested to execute (when mergePlan)- DeltaAnalysis logical resolution rule is executed (and resolves
MergeIntoTable
logical command)
AnalysisExceptions¶
apply
throws an AnalysisException
for the whenClauses
empty:
There must be at least one WHEN clause in a MERGE statement
apply
throws an AnalysisException
if there is a matched clause with no condition (except the last matched clause):
When there are more than one MATCHED clauses in a MERGE statement,
only the last MATCHED clause can omit the condition.
apply
throws an AnalysisException
if there is an insert clause with no condition (except the last matched clause):
When there are more than one NOT MATCHED clauses in a MERGE statement,
only the last NOT MATCHED clause can omit the condition.
Resolving Merge (incl. Schema Evolution)¶
resolveReferencesAndSchema(
merge: DeltaMergeInto,
conf: SQLConf)(
resolveExpr: (Expression, LogicalPlan) => Expression): DeltaMergeInto
resolveReferencesAndSchema
creates a new DeltaMergeInto
with the following changes (compared to the given DeltaMergeInto
):
- The condition, matched, notMatched and notMatchedBySource clauses resolved
- The migrateSchema flag reflects schema.autoMerge.enabled
- The final schema assigned
resolveReferencesAndSchema
destructures the given DeltaMergeInto (skipping the migrateSchema and finalSchema parts).
resolveReferencesAndSchema
creates two artificial (dummy) logical plans for the target and source plans of the DeltaMergeInto (for expression resolution of merge clauses).
resolveReferencesAndSchema
resolves all merge expressions:
- condition
- matchedClauses
- notMatchedClauses (against the
fakeSourcePlan
) - notMatchedBySourceClauses (against the
fakeTargetPlan
)
resolveReferencesAndSchema
builds the final schema (taking schema.autoMerge.enabled into account).
With schema.autoMerge.enabled disabled, resolveReferencesAndSchema
uses the schema of the target table as the final schema.
With schema.autoMerge.enabled enabled, resolveReferencesAndSchema
does the following:
- Collects
assignments
to be the targetColNameParts of the DeltaMergeActions (among the actions) of the matchedClauses and notMatchedClauses - Checks if there are any
UnresolvedStar
s among the actions (they are notDeltaMergeAction
s so skipped in the step earlier) (containsStarAction
) - Builds a
migrationSchema
with the fields of the source table that are referenced by merge clauses - Merges the schema of the target table with the
migrationSchema
(allowing conversions from the types of the source type to the target's)
Schema Evolution and Types
Implicit conversions are allowed, so resolveReferencesAndSchema
can change the type of source columns to match the target's.
In the end, resolveReferencesAndSchema
creates a new DeltaMergeInto.
resolveReferencesAndSchema
is used when:
DeltaMergeBuilder
is requested to execute- DeltaAnalysis logical resolution rule is executed (to resolve
MergeIntoTable
logical command)
Resolving Single Clause¶
resolveClause[T <: DeltaMergeIntoClause](
clause: T,
planToResolveAction: LogicalPlan): T
resolveClause
resolves the actions of the given DeltaMergeIntoClause (UnresolvedStar
or DeltaMergeAction):
-
With Schema Merging disabled,
resolveClause
expands*
s (UnresolvedStar
s) to the target columns with source columns resolved against the fake source plan. There will be a new DeltaMergeAction for every column in the target plan (with targetColNameResolved flag enabled) -
With Schema Merging enabled,
resolveClause
expands*
s (UnresolvedStar
s) differently for WHEN NOT MATCHED THEN INSERT and WHEN MATCHED UPDATE clauses -
For DeltaMergeActions,
resolveClause
resolves the targetColNameParts against the fake target plan and, if failed yet Schema Merging is enabled, uses the fake source plan
In the end, resolveClause
resolves the condition of the DeltaMergeIntoClause.
Filtering Schema¶
filterSchema(
sourceSchema: StructType,
basePath: Seq[String]): StructType
filterSchema
filters the source schema to retain only fields that are referenced by a merge clause.
Recursive Method
filterSchema
is recursive so it can handle StructType
fields.
There are the following two base cases (that terminate recursion):
- There is an exact match (and the field is included in the final schema)
- A field and its children are not assigned to in any
*
or non-*
action (and the field is skipped in the final schema)
For every field in the given StructType
, filterSchema
checks if the field is amongst (referenced by) the clause assignments and does one of the following:
- If there is an exact match,
filterSchema
keeps the field - If the type of the field is a
StructType
and one of the children is assigned to in a merge clause or there is a*
action, recursively filterSchema with the struct and the field path - For non-
StructType
fields and there is a*
action,filterSchema
keeps the field - Otherwise,
filterSchema
drops (filters out) the field