MergeIntoMaterializeSource¶
MergeIntoMaterializeSource is an abstraction of merge commands that can materialize source table when executed.
Implementations¶
Source DataFrame¶
sourceDF: Option[DataFrame] = None
sourceDF is undefined (None) when MergeIntoMaterializeSource is created and after runWithMaterializedSourceLostRetries.
sourceDF is assigned a DataFrame in prepareSourceDFAndReturnMaterializeReason:
- When configured not to materialize (that simply creates a
DataFramefrom the source plan directly) - After local checkpointing
When materialized, sourceDF's execution plan is printed out in a DEBUG message:
Materialized MERGE source plan:
[sourceDF]
When defined, sourceDF is available using getSourceDF.
getSourceDF¶
getSourceDF: DataFrame
getSourceDF returns the source dataframe, if defined, or reports an IllegalStateException with the following message:
sourceDF was not initialized! Call prepareSourceDFAndReturnMaterializeReason before.
getSourceDF is used when:
ClassicMergeExecutoris requested to findTouchedFiles and write out merge changesInsertOnlyMergeExecutoris requested to write out inserts
mergeMaterializedSource RDD¶
materializedSourceRDD: Option[RDD[InternalRow]] = None
Mutable Variable
materializedSourceRDD is a Scala var.
Learn more in the Scala Language Specification.
MergeIntoMaterializeSource defines an internal materializedSourceRDD registry for an RDD that is from a checkpointed source dataframe (using Dataset.localCheckpoint operator).
The name of materializedSourceRDD is mergeMaterializedSource.
materializedSourceRDD is undefined (None) when MergeIntoMaterializeSource is created and when runWithMaterializedSourceLostRetries (and an attempt failed).
materializedSourceRDD is assigned an RDD in prepareSourceDFAndReturnMaterializeReason.
materializedSourceRDD is unpersist (using RDD.unpersist operator) when runWithMaterializedSourceLostRetries (and an attempt failed).
shouldMaterializeSource¶
shouldMaterializeSource(
spark: SparkSession,
source: LogicalPlan,
isInsertOnly: Boolean): (Boolean, MergeIntoMaterializeSourceReason)
shouldMaterializeSource returns a pair of the following:
- Whether the given
sourceplan can be materialized (checkpointed) or not (Boolean) - The reason for the decision (
MergeIntoMaterializeSourceReason)
shouldMaterializeSource is controlled by spark.databricks.delta.merge.materializeSource.
| merge.materializeSource | Boolean | MergeIntoMaterializeSourceReason |
|---|---|---|
ALL | true | MATERIALIZE_ALL |
AUTO | see the table below | |
NONE | false | NOT_MATERIALIZED_NONE |
| any other value incl. invalid | true | INVALID_CONFIG |
For auto, shouldMaterializeSource is as follows (in the order):
| Boolean | MergeIntoMaterializeSourceReason | Condition |
|---|---|---|
false | NOT_MATERIALIZED_AUTO_INSERT_ONLY |
|
true | NON_DETERMINISTIC_SOURCE_NON_DELTA | The given source plan contains non-Delta scans |
true | NON_DETERMINISTIC_SOURCE_OPERATORS | The given source plan is non-deterministic |
false | NOT_MATERIALIZED_AUTO |
isInsertOnly flag
The given isInsertOnly flag is driven by isInsertOnly.
shouldMaterializeSource is used when:
MergeIntoCommandBaseis requested to runMergeIntoMaterializeSourceis requested to prepare the source table
Preparing Source Table¶
prepareSourceDFAndReturnMaterializeReason(
spark: SparkSession,
source: LogicalPlan,
condition: Expression,
matchedClauses: Seq[DeltaMergeIntoMatchedClause],
notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause],
isInsertOnly: Boolean): MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason
prepareSourceDFAndReturnMaterializeReason shouldMaterializeSource.
shouldMaterializeSource
shouldMaterializeSource gives whether the source table has been materialized or not (materialize) and the reason of the decision (materializeReason).
When decided not to materialize, prepareSourceDFAndReturnMaterializeReason creates a DataFrame for the given source logical plan that is available as the sourceDF from now on. prepareSourceDFAndReturnMaterializeReason stops (and returns the reason for this rejection).
prepareSourceDFAndReturnMaterializeReason finds the columns used in this MERGE command and creates a Project logical operator with the columns and the given source logical plan.
Column Pruning
With the Project logical operator and the columns used for this MERGE command, prepareSourceDFAndReturnMaterializeReason hopes for Column Pruning optimization.
prepareSourceDFAndReturnMaterializeReason creates a DataFrame (with the Project operator over the source logical plan) that is then checkpointed (using Dataset.localCheckpoint operator).
Local Checkpointing Lazily
prepareSourceDFAndReturnMaterializeReason uses Dataset.localCheckpoint operator with eager flag disabled so materialization (checkpointing) happens at first access. This is for more precise performance metrics.
prepareSourceDFAndReturnMaterializeReason stores the checkpointed DataFrame to be available later as the materializedSourceRDD. The name of the RDD is mergeMaterializedSource.
prepareSourceDFAndReturnMaterializeReason add hints to the plan (with the source and the analyzed logical plan of the checkpointed and column-pruned source Dataset) and creates a DataFrame that is available as the sourceDF from now on.
prepareSourceDFAndReturnMaterializeReason caches (using Dataset.persist operator) the RDD with the storage level based on the following configuration properties:
- spark.databricks.delta.merge.materializeSource.rddStorageLevel initially (based on the attempt)
- spark.databricks.delta.merge.materializeSource.rddStorageLevelRetry when retried (based on the attempt)
prepareSourceDFAndReturnMaterializeReason prints out the following DEBUG messages:
Materializing MERGE with pruned columns [referencedSourceColumns].
Materialized MERGE source plan:
[getSourceDF]
In the end, prepareSourceDFAndReturnMaterializeReason returns the reason to materialize (materializeReason from shouldMaterializeSource).
prepareSourceDFAndReturnMaterializeReason is used when:
MergeIntoCommandis requested to run merge
runWithMaterializedSourceLostRetries¶
runWithMaterializedSourceLostRetries(
spark: SparkSession,
deltaLog: DeltaLog,
metrics: Map[String, SQLMetric],
runMergeFunc: SparkSession => Seq[Row]): Seq[Row]
runWithMaterializedSourceLostRetries executes the runMergeFunc function until it succeeds or runs out of configured allowed attempts (based on spark.databricks.delta.merge.materializeSource.maxAttempts).
In the end, runWithMaterializedSourceLostRetries does the following:
- Unpersists the materializedSourceRDD (if available)
- Removes (
None) the materializedSourceRDD and the sourceDF
runWithMaterializedSourceLostRetries is used when:
MergeIntoCommandis requested to run
Logging¶
MergeIntoMaterializeSource is an abstract class and logging is configured using the logger of MergeIntoCommand.