Skip to content

DeltaMergeInto Logical Command

DeltaMergeInto is a logical Command (Spark SQL).

Creating Instance

DeltaMergeInto takes the following to be created:

When created, DeltaMergeInto verifies the actions in the matchedClauses and notMatchedClauses clauses.

DeltaMergeInto is created (using apply and resolveReferences utilities) when:

Logical Resolution

DeltaMergeInto is resolved to MergeIntoCommand by PreprocessTableMerge logical resolution rule.

migrateSchema Flag

DeltaMergeInto is given migrateSchema flag when created:

migrateSchema is used when:

SupportsSubquery

DeltaMergeInto is a SupportsSubquery (Spark SQL)

Creating DeltaMergeInto

apply(
  target: LogicalPlan,
  source: LogicalPlan,
  condition: Expression,
  whenClauses: Seq[DeltaMergeIntoClause]): DeltaMergeInto

apply...FIXME


apply is used when:

  • DeltaMergeBuilder is requested to execute (when mergePlan)
  • DeltaAnalysis logical resolution rule is executed (and resolves MergeIntoTable logical command)

AnalysisExceptions

apply throws an AnalysisException for the whenClauses empty:

There must be at least one WHEN clause in a MERGE statement

apply throws an AnalysisException if there is a matched clause with no condition (except the last matched clause):

When there are more than one MATCHED clauses in a MERGE statement,
only the last MATCHED clause can omit the condition.

apply throws an AnalysisException if there is an insert clause with no condition (except the last matched clause):

When there are more than one NOT MATCHED clauses in a MERGE statement,
only the last NOT MATCHED clause can omit the condition.

Resolving Merge (incl. Schema Evolution)

resolveReferencesAndSchema(
  merge: DeltaMergeInto,
  conf: SQLConf)(
  resolveExpr: (Expression, LogicalPlan) => Expression): DeltaMergeInto

resolveReferencesAndSchema creates a new DeltaMergeInto with the following changes (compared to the given DeltaMergeInto):

  1. The condition, matched, notMatched and notMatchedBySource clauses resolved
  2. The migrateSchema flag reflects schema.autoMerge.enabled
  3. The final schema assigned

resolveReferencesAndSchema destructures the given DeltaMergeInto (skipping the migrateSchema and finalSchema parts).

resolveReferencesAndSchema creates two artificial (dummy) logical plans for the target and source plans of the DeltaMergeInto (for expression resolution of merge clauses).

resolveReferencesAndSchema resolves all merge expressions:

  1. condition
  2. matchedClauses
  3. notMatchedClauses (against the fakeSourcePlan)
  4. notMatchedBySourceClauses (against the fakeTargetPlan)

resolveReferencesAndSchema builds the final schema (taking schema.autoMerge.enabled into account).

With schema.autoMerge.enabled disabled, resolveReferencesAndSchema uses the schema of the target table as the final schema.

With schema.autoMerge.enabled enabled, resolveReferencesAndSchema does the following:

  1. Collects assignments to be the targetColNameParts of the DeltaMergeActions (among the actions) of the matchedClauses and notMatchedClauses
  2. Checks if there are any UnresolvedStars among the actions (they are not DeltaMergeActions so skipped in the step earlier) (containsStarAction)
  3. Builds a migrationSchema with the fields of the source table that are referenced by merge clauses
  4. Merges the schema of the target table with the migrationSchema (allowing conversions from the types of the source type to the target's)

Schema Evolution and Types

Implicit conversions are allowed, so resolveReferencesAndSchema can change the type of source columns to match the target's.

In the end, resolveReferencesAndSchema creates a new DeltaMergeInto.


resolveReferencesAndSchema is used when:

  • DeltaMergeBuilder is requested to execute
  • DeltaAnalysis logical resolution rule is executed (to resolve MergeIntoTable logical command)

Resolving Single Clause

resolveClause[T <: DeltaMergeIntoClause](
  clause: T,
  planToResolveAction: LogicalPlan): T

resolveClause resolves the actions of the given DeltaMergeIntoClause (UnresolvedStar or DeltaMergeAction):

In the end, resolveClause resolves the condition of the DeltaMergeIntoClause.

Filtering Schema

filterSchema(
  sourceSchema: StructType,
  basePath: Seq[String]): StructType

filterSchema filters the source schema to retain only fields that are referenced by a merge clause.

Recursive Method

filterSchema is recursive so it can handle StructType fields.

There are the following two base cases (that terminate recursion):

  1. There is an exact match (and the field is included in the final schema)
  2. A field and its children are not assigned to in any * or non-* action (and the field is skipped in the final schema)

For every field in the given StructType, filterSchema checks if the field is amongst (referenced by) the clause assignments and does one of the following:

  1. If there is an exact match, filterSchema keeps the field
  2. If the type of the field is a StructType and one of the children is assigned to in a merge clause or there is a * action, recursively filterSchema with the struct and the field path
  3. For non-StructType fields and there is a * action, filterSchema keeps the field
  4. Otherwise, filterSchema drops (filters out) the field