MergeIntoMaterializeSource¶
MergeIntoMaterializeSource
is an abstraction of merge commands that can materialize source table when executed.
Implementations¶
Source DataFrame¶
sourceDF: Option[DataFrame] = None
sourceDF
is undefined (None
) when MergeIntoMaterializeSource
is created and after runWithMaterializedSourceLostRetries.
sourceDF
is assigned a DataFrame
in prepareSourceDFAndReturnMaterializeReason:
- When configured not to materialize (that simply creates a
DataFrame
from the source plan directly) - After local checkpointing
When materialized, sourceDF
's execution plan is printed out in a DEBUG message:
Materialized MERGE source plan:
[sourceDF]
When defined, sourceDF
is available using getSourceDF.
getSourceDF¶
getSourceDF: DataFrame
getSourceDF
returns the source dataframe, if defined, or reports an IllegalStateException
with the following message:
sourceDF was not initialized! Call prepareSourceDFAndReturnMaterializeReason before.
getSourceDF
is used when:
ClassicMergeExecutor
is requested to findTouchedFiles and write out merge changesInsertOnlyMergeExecutor
is requested to write out inserts
mergeMaterializedSource RDD¶
materializedSourceRDD: Option[RDD[InternalRow]] = None
Mutable Variable
materializedSourceRDD
is a Scala var
.
Learn more in the Scala Language Specification.
MergeIntoMaterializeSource
defines an internal materializedSourceRDD
registry for an RDD that is from a checkpointed source dataframe (using Dataset.localCheckpoint
operator).
The name of materializedSourceRDD
is mergeMaterializedSource.
materializedSourceRDD
is undefined (None
) when MergeIntoMaterializeSource
is created and when runWithMaterializedSourceLostRetries (and an attempt failed).
materializedSourceRDD
is assigned an RDD
in prepareSourceDFAndReturnMaterializeReason.
materializedSourceRDD
is unpersist (using RDD.unpersist
operator) when runWithMaterializedSourceLostRetries (and an attempt failed).
shouldMaterializeSource¶
shouldMaterializeSource(
spark: SparkSession,
source: LogicalPlan,
isInsertOnly: Boolean): (Boolean, MergeIntoMaterializeSourceReason)
shouldMaterializeSource
returns a pair of the following:
- Whether the given
source
plan can be materialized (checkpointed) or not (Boolean
) - The reason for the decision (
MergeIntoMaterializeSourceReason
)
shouldMaterializeSource
is controlled by spark.databricks.delta.merge.materializeSource.
merge.materializeSource | Boolean | MergeIntoMaterializeSourceReason |
---|---|---|
ALL | true | MATERIALIZE_ALL |
AUTO | see the table below | |
NONE | false | NOT_MATERIALIZED_NONE |
any other value incl. invalid | true | INVALID_CONFIG |
For auto
, shouldMaterializeSource
is as follows (in the order):
Boolean | MergeIntoMaterializeSourceReason | Condition |
---|---|---|
false | NOT_MATERIALIZED_AUTO_INSERT_ONLY |
|
true | NON_DETERMINISTIC_SOURCE_NON_DELTA | The given source plan contains non-Delta scans |
true | NON_DETERMINISTIC_SOURCE_OPERATORS | The given source plan is non-deterministic |
false | NOT_MATERIALIZED_AUTO |
isInsertOnly
flag
The given isInsertOnly
flag is driven by isInsertOnly.
shouldMaterializeSource
is used when:
MergeIntoCommandBase
is requested to runMergeIntoMaterializeSource
is requested to prepare the source table
Preparing Source Table¶
prepareSourceDFAndReturnMaterializeReason(
spark: SparkSession,
source: LogicalPlan,
condition: Expression,
matchedClauses: Seq[DeltaMergeIntoMatchedClause],
notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause],
isInsertOnly: Boolean): MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason
prepareSourceDFAndReturnMaterializeReason
shouldMaterializeSource.
shouldMaterializeSource
shouldMaterializeSource
gives whether the source table has been materialized or not (materialize
) and the reason of the decision (materializeReason
).
When decided not to materialize, prepareSourceDFAndReturnMaterializeReason
creates a DataFrame
for the given source
logical plan that is available as the sourceDF from now on. prepareSourceDFAndReturnMaterializeReason
stops (and returns the reason for this rejection).
prepareSourceDFAndReturnMaterializeReason
finds the columns used in this MERGE command and creates a Project
logical operator with the columns and the given source
logical plan.
Column Pruning
With the Project
logical operator and the columns used for this MERGE command, prepareSourceDFAndReturnMaterializeReason
hopes for Column Pruning optimization.
prepareSourceDFAndReturnMaterializeReason
creates a DataFrame
(with the Project
operator over the source
logical plan) that is then checkpointed (using Dataset.localCheckpoint
operator).
Local Checkpointing Lazily
prepareSourceDFAndReturnMaterializeReason
uses Dataset.localCheckpoint
operator with eager
flag disabled so materialization (checkpointing) happens at first access. This is for more precise performance metrics.
prepareSourceDFAndReturnMaterializeReason
stores the checkpointed DataFrame
to be available later as the materializedSourceRDD. The name of the RDD is mergeMaterializedSource.
prepareSourceDFAndReturnMaterializeReason
add hints to the plan (with the source
and the analyzed logical plan of the checkpointed and column-pruned source Dataset
) and creates a DataFrame
that is available as the sourceDF from now on.
prepareSourceDFAndReturnMaterializeReason
caches (using Dataset.persist
operator) the RDD with the storage level based on the following configuration properties:
- spark.databricks.delta.merge.materializeSource.rddStorageLevel initially (based on the attempt)
- spark.databricks.delta.merge.materializeSource.rddStorageLevelRetry when retried (based on the attempt)
prepareSourceDFAndReturnMaterializeReason
prints out the following DEBUG messages:
Materializing MERGE with pruned columns [referencedSourceColumns].
Materialized MERGE source plan:
[getSourceDF]
In the end, prepareSourceDFAndReturnMaterializeReason
returns the reason to materialize (materializeReason
from shouldMaterializeSource).
prepareSourceDFAndReturnMaterializeReason
is used when:
MergeIntoCommand
is requested to run merge
runWithMaterializedSourceLostRetries¶
runWithMaterializedSourceLostRetries(
spark: SparkSession,
deltaLog: DeltaLog,
metrics: Map[String, SQLMetric],
runMergeFunc: SparkSession => Seq[Row]): Seq[Row]
runWithMaterializedSourceLostRetries
executes the runMergeFunc
function until it succeeds or runs out of configured allowed attempts (based on spark.databricks.delta.merge.materializeSource.maxAttempts).
In the end, runWithMaterializedSourceLostRetries
does the following:
- Unpersists the materializedSourceRDD (if available)
- Removes (
None
) the materializedSourceRDD and the sourceDF
runWithMaterializedSourceLostRetries
is used when:
MergeIntoCommand
is requested to run
Logging¶
MergeIntoMaterializeSource
is an abstract class and logging is configured using the logger of MergeIntoCommand.