Skip to content

MergeIntoMaterializeSource

MergeIntoMaterializeSource is an abstraction of merge commands that can materialize source table when executed.

Implementations

Source DataFrame

sourceDF: Option[DataFrame] = None
Mutable Variable

sourceDF is a Scala var.

Learn more in the Scala Language Specification.

sourceDF is undefined (None) when MergeIntoMaterializeSource is created and after runWithMaterializedSourceLostRetries.

sourceDF is assigned a DataFrame in prepareSourceDFAndReturnMaterializeReason:

  1. When configured not to materialize (that simply creates a DataFrame from the source plan directly)
  2. After local checkpointing

When materialized, sourceDF's execution plan is printed out in a DEBUG message:

Materialized MERGE source plan:
[sourceDF]

When defined, sourceDF is available using getSourceDF.

getSourceDF

getSourceDF: DataFrame

getSourceDF returns the source dataframe, if defined, or reports an IllegalStateException with the following message:

sourceDF was not initialized! Call prepareSourceDFAndReturnMaterializeReason before.

getSourceDF is used when:

mergeMaterializedSource RDD

materializedSourceRDD: Option[RDD[InternalRow]] = None
Mutable Variable

materializedSourceRDD is a Scala var.

Learn more in the Scala Language Specification.

MergeIntoMaterializeSource defines an internal materializedSourceRDD registry for an RDD that is from a checkpointed source dataframe (using Dataset.localCheckpoint operator).

The name of materializedSourceRDD is mergeMaterializedSource.

materializedSourceRDD is undefined (None) when MergeIntoMaterializeSource is created and when runWithMaterializedSourceLostRetries (and an attempt failed).

materializedSourceRDD is assigned an RDD in prepareSourceDFAndReturnMaterializeReason.

materializedSourceRDD is unpersist (using RDD.unpersist operator) when runWithMaterializedSourceLostRetries (and an attempt failed).

shouldMaterializeSource

shouldMaterializeSource(
  spark: SparkSession,
  source: LogicalPlan,
  isInsertOnly: Boolean): (Boolean, MergeIntoMaterializeSourceReason)

shouldMaterializeSource returns a pair of the following:

  1. Whether the given source plan can be materialized (checkpointed) or not (Boolean)
  2. The reason for the decision (MergeIntoMaterializeSourceReason)

shouldMaterializeSource is controlled by spark.databricks.delta.merge.materializeSource.

merge.materializeSource Boolean MergeIntoMaterializeSourceReason
ALL true MATERIALIZE_ALL
AUTO see the table below
NONE false NOT_MATERIALIZED_NONE
any other value incl. invalid true INVALID_CONFIG

For auto, shouldMaterializeSource is as follows (in the order):

Boolean MergeIntoMaterializeSourceReason Condition
false NOT_MATERIALIZED_AUTO_INSERT_ONLY
  1. The given isInsertOnly flag enabled
  2. merge.optimizeInsertOnlyMerge.enabled is enabled
true NON_DETERMINISTIC_SOURCE_NON_DELTA The given source plan contains non-Delta scans
true NON_DETERMINISTIC_SOURCE_OPERATORS The given source plan is non-deterministic
false NOT_MATERIALIZED_AUTO

isInsertOnly flag

The given isInsertOnly flag is driven by isInsertOnly.


shouldMaterializeSource is used when:

Preparing Source Table

prepareSourceDFAndReturnMaterializeReason(
  spark: SparkSession,
  source: LogicalPlan,
  condition: Expression,
  matchedClauses: Seq[DeltaMergeIntoMatchedClause],
  notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause],
  isInsertOnly: Boolean): MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason

prepareSourceDFAndReturnMaterializeReason shouldMaterializeSource.

shouldMaterializeSource

shouldMaterializeSource gives whether the source table has been materialized or not (materialize) and the reason of the decision (materializeReason).

When decided not to materialize, prepareSourceDFAndReturnMaterializeReason creates a DataFrame for the given source logical plan that is available as the sourceDF from now on. prepareSourceDFAndReturnMaterializeReason stops (and returns the reason for this rejection).

prepareSourceDFAndReturnMaterializeReason finds the columns used in this MERGE command and creates a Project logical operator with the columns and the given source logical plan.

Column Pruning

With the Project logical operator and the columns used for this MERGE command, prepareSourceDFAndReturnMaterializeReason hopes for Column Pruning optimization.

prepareSourceDFAndReturnMaterializeReason creates a DataFrame (with the Project operator over the source logical plan) that is then checkpointed (using Dataset.localCheckpoint operator).

Local Checkpointing Lazily

prepareSourceDFAndReturnMaterializeReason uses Dataset.localCheckpoint operator with eager flag disabled so materialization (checkpointing) happens at first access. This is for more precise performance metrics.

prepareSourceDFAndReturnMaterializeReason stores the checkpointed DataFrame to be available later as the materializedSourceRDD. The name of the RDD is mergeMaterializedSource.

prepareSourceDFAndReturnMaterializeReason add hints to the plan (with the source and the analyzed logical plan of the checkpointed and column-pruned source Dataset) and creates a DataFrame that is available as the sourceDF from now on.

prepareSourceDFAndReturnMaterializeReason caches (using Dataset.persist operator) the RDD with the storage level based on the following configuration properties:

prepareSourceDFAndReturnMaterializeReason prints out the following DEBUG messages:

Materializing MERGE with pruned columns [referencedSourceColumns].
Materialized MERGE source plan:
[getSourceDF]

In the end, prepareSourceDFAndReturnMaterializeReason returns the reason to materialize (materializeReason from shouldMaterializeSource).


prepareSourceDFAndReturnMaterializeReason is used when:

runWithMaterializedSourceLostRetries

runWithMaterializedSourceLostRetries(
  spark: SparkSession,
  deltaLog: DeltaLog,
  metrics: Map[String, SQLMetric],
  runMergeFunc: SparkSession => Seq[Row]): Seq[Row]

runWithMaterializedSourceLostRetries executes the runMergeFunc function until it succeeds or runs out of configured allowed attempts (based on spark.databricks.delta.merge.materializeSource.maxAttempts).

In the end, runWithMaterializedSourceLostRetries does the following:


runWithMaterializedSourceLostRetries is used when:

  • MergeIntoCommand is requested to run

Logging

MergeIntoMaterializeSource is an abstract class and logging is configured using the logger of MergeIntoCommand.