DeltaMergeInto Logical Command¶
DeltaMergeInto is a logical Command (Spark SQL).
Creating Instance¶
DeltaMergeInto takes the following to be created:
- Target Table (Spark SQL)
- Source Table or Subquery (Spark SQL)
- Condition Expression
- DeltaMergeIntoMatchedClauses
- DeltaMergeIntoNotMatchedClauses
- DeltaMergeIntoNotMatchedBySourceClauses
- migrateSchema flag
- Final schema (
StructType)
When created, DeltaMergeInto verifies the actions in the matchedClauses and notMatchedClauses clauses.
DeltaMergeInto is created (using apply and resolveReferences utilities) when:
DeltaMergeBuilderis requested to execute- DeltaAnalysis logical resolution rule is executed
Logical Resolution¶
DeltaMergeInto is resolved to MergeIntoCommand by PreprocessTableMerge logical resolution rule.
migrateSchema Flag¶
DeltaMergeInto is given migrateSchema flag when created:
- apply uses
falsealways - resolveReferences is
trueonly with the spark.databricks.delta.schema.autoMerge.enabled configuration property enabled and*s only (in matched and not-matched clauses)
migrateSchema is used when:
- PreprocessTableMerge logical resolution rule is executed
SupportsSubquery¶
DeltaMergeInto is a SupportsSubquery (Spark SQL)
Creating DeltaMergeInto¶
apply(
target: LogicalPlan,
source: LogicalPlan,
condition: Expression,
whenClauses: Seq[DeltaMergeIntoClause]): DeltaMergeInto
apply...FIXME
apply is used when:
DeltaMergeBuilderis requested to execute (when mergePlan)- DeltaAnalysis logical resolution rule is executed (and resolves
MergeIntoTablelogical command)
AnalysisExceptions¶
apply throws an AnalysisException for the whenClauses empty:
There must be at least one WHEN clause in a MERGE statement
apply throws an AnalysisException if there is a matched clause with no condition (except the last matched clause):
When there are more than one MATCHED clauses in a MERGE statement,
only the last MATCHED clause can omit the condition.
apply throws an AnalysisException if there is an insert clause with no condition (except the last matched clause):
When there are more than one NOT MATCHED clauses in a MERGE statement,
only the last NOT MATCHED clause can omit the condition.
Resolving Merge (incl. Schema Evolution)¶
resolveReferencesAndSchema(
merge: DeltaMergeInto,
conf: SQLConf)(
resolveExpr: (Expression, LogicalPlan) => Expression): DeltaMergeInto
resolveReferencesAndSchema creates a new DeltaMergeInto with the following changes (compared to the given DeltaMergeInto):
- The condition, matched, notMatched and notMatchedBySource clauses resolved
- The migrateSchema flag reflects schema.autoMerge.enabled
- The final schema assigned
resolveReferencesAndSchema destructures the given DeltaMergeInto (skipping the migrateSchema and finalSchema parts).
resolveReferencesAndSchema creates two artificial (dummy) logical plans for the target and source plans of the DeltaMergeInto (for expression resolution of merge clauses).
resolveReferencesAndSchema resolves all merge expressions:
- condition
- matchedClauses
- notMatchedClauses (against the
fakeSourcePlan) - notMatchedBySourceClauses (against the
fakeTargetPlan)
resolveReferencesAndSchema builds the final schema (taking schema.autoMerge.enabled into account).
With schema.autoMerge.enabled disabled, resolveReferencesAndSchema uses the schema of the target table as the final schema.
With schema.autoMerge.enabled enabled, resolveReferencesAndSchema does the following:
- Collects
assignmentsto be the targetColNameParts of the DeltaMergeActions (among the actions) of the matchedClauses and notMatchedClauses - Checks if there are any
UnresolvedStars among the actions (they are notDeltaMergeActions so skipped in the step earlier) (containsStarAction) - Builds a
migrationSchemawith the fields of the source table that are referenced by merge clauses - Merges the schema of the target table with the
migrationSchema(allowing conversions from the types of the source type to the target's)
Schema Evolution and Types
Implicit conversions are allowed, so resolveReferencesAndSchema can change the type of source columns to match the target's.
In the end, resolveReferencesAndSchema creates a new DeltaMergeInto.
resolveReferencesAndSchema is used when:
DeltaMergeBuilderis requested to execute- DeltaAnalysis logical resolution rule is executed (to resolve
MergeIntoTablelogical command)
Resolving Single Clause¶
resolveClause[T <: DeltaMergeIntoClause](
clause: T,
planToResolveAction: LogicalPlan): T
resolveClause resolves the actions of the given DeltaMergeIntoClause (UnresolvedStar or DeltaMergeAction):
-
With Schema Merging disabled,
resolveClauseexpands*s (UnresolvedStars) to the target columns with source columns resolved against the fake source plan. There will be a new DeltaMergeAction for every column in the target plan (with targetColNameResolved flag enabled) -
With Schema Merging enabled,
resolveClauseexpands*s (UnresolvedStars) differently for WHEN NOT MATCHED THEN INSERT and WHEN MATCHED UPDATE clauses -
For DeltaMergeActions,
resolveClauseresolves the targetColNameParts against the fake target plan and, if failed yet Schema Merging is enabled, uses the fake source plan
In the end, resolveClause resolves the condition of the DeltaMergeIntoClause.
Filtering Schema¶
filterSchema(
sourceSchema: StructType,
basePath: Seq[String]): StructType
filterSchema filters the source schema to retain only fields that are referenced by a merge clause.
Recursive Method
filterSchema is recursive so it can handle StructType fields.
There are the following two base cases (that terminate recursion):
- There is an exact match (and the field is included in the final schema)
- A field and its children are not assigned to in any
*or non-*action (and the field is skipped in the final schema)
For every field in the given StructType, filterSchema checks if the field is amongst (referenced by) the clause assignments and does one of the following:
- If there is an exact match,
filterSchemakeeps the field - If the type of the field is a
StructTypeand one of the children is assigned to in a merge clause or there is a*action, recursively filterSchema with the struct and the field path - For non-
StructTypefields and there is a*action,filterSchemakeeps the field - Otherwise,
filterSchemadrops (filters out) the field