MergeIntoCommand¶
MergeIntoCommand
is a transactional DeltaCommand that represents a DeltaMergeInto logical command at execution.
Performance Metrics¶
Name | web UI |
---|---|
numSourceRows | number of source rows |
numSourceRowsInSecondScan | number of source rows (during repeated scan) |
numTargetRowsCopied | number of target rows rewritten unmodified |
numTargetRowsInserted | number of inserted rows |
numTargetRowsUpdated | number of updated rows |
numTargetRowsDeleted | number of deleted rows |
numTargetFilesBeforeSkipping | number of target files before skipping |
numTargetFilesAfterSkipping | number of target files after skipping |
numTargetFilesRemoved | number of files removed to target |
numTargetFilesAdded | number of files added to target |
numTargetChangeFilesAdded | number of change data capture files generated |
numTargetChangeFileBytes | total size of change data capture files generated |
numTargetBytesBeforeSkipping | number of target bytes before skipping |
numTargetBytesAfterSkipping | number of target bytes after skipping |
numTargetBytesRemoved | number of target bytes removed |
numTargetBytesAdded | number of target bytes added |
numTargetPartitionsAfterSkipping | number of target partitions after skipping |
numTargetPartitionsRemovedFrom | number of target partitions from which files were removed |
numTargetPartitionsAddedTo | number of target partitions to which files were added |
executionTimeMs | time taken to execute the entire operation |
scanTimeMs | time taken to scan the files for matches |
rewriteTimeMs | time taken to rewrite the matched files |
number of target rows rewritten unmodified¶
numTargetRowsCopied
performance metric (like the other metrics) is turned into a non-deterministic user-defined function (UDF).
numTargetRowsCopied
becomes incrNoopCountExpr
UDF.
incrNoopCountExpr
UDF is resolved on a joined plan and used to create a JoinedRowProcessor for processing partitions of the joined plan Dataset
.
Creating Instance¶
MergeIntoCommand
takes the following to be created:
- Source Data
- Target Data (LogicalPlan)
- TahoeFileIndex
- Merge Condition (Expression)
- Matched Clauses
- Non-Matched Insert Clauses
- Migrated Schema
MergeIntoCommand
is created when:
- PreprocessTableMerge logical resolution rule is executed (to resolve a DeltaMergeInto logical command)
Source Data¶
source: LogicalPlan
When created, MergeIntoCommand
is given a LogicalPlan
(Spark SQL) for the source data to merge from (internally referred to as source).
The source is used twice:
- Firstly, in one of the following:
- An inner join (in findTouchedFiles) that is
count
in web UI - A leftanti join (in writeInsertsOnlyWhenNoMatchedClauses)
- An inner join (in findTouchedFiles) that is
- Secondly, in right or full outer join (in writeAllChanges)
Tip
Enable DEBUG
logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand logger to see the inner-workings of writeAllChanges.
Migrated Schema¶
migratedSchema: Option[StructType]
MergeIntoCommand
can be given a migratedSchema
(Spark SQL).
Target Delta Table¶
targetDeltaLog: DeltaLog
targetDeltaLog
is the DeltaLog (of the TahoeFileIndex) of the target delta table.
targetDeltaLog
is used for the following:
- Start a new transaction when executed
- To access the Data Path when finding files to rewrite
Lazy Value
targetDeltaLog
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and cached afterwards.
Executing Command¶
run(
spark: SparkSession): Seq[Row]
run
is part of the RunnableCommand
(Spark SQL) abstraction.
run
is a transactional operation that is made up of the following steps:
Begin Transaction¶
run
starts a new transaction (on the target delta table).
schema.autoMerge.enabled¶
Only when spark.databricks.delta.schema.autoMerge.enabled configuration property is enabled, run
updates the metadata (of the transaction) with the following:
- migratedSchema (if defined) or the schema of the target
isOverwriteMode
flag offrearrangeOnly
flag off
FileActions¶
run
determines FileActions.
Single Insert-Only Merge¶
For a single insert-only merge with spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled configuration property enabled, run
writeInsertsOnlyWhenNoMatchedClauses.
Other Merges¶
Otherwise, run
finds the files to rewrite (i.e., AddFiles with the rows that satisfy the merge condition) and uses them to write out merge changes.
The AddFile
s are converted into RemoveFiles.
run
gives the RemoveFile
s and the written-out FileActions.
Register Metrics¶
run
registers the SQL metrics (with the current transaction).
Commit Transaction¶
run
commits the current transaction (with the FileActions and MERGE
operation).
Re-Cache Target Delta Table¶
run
requests the CacheManager
to re-cache the target plan.
Post Metric Updates¶
In the end, run
posts the SQL metric updates (as a SparkListenerDriverAccumUpdates
(Apache Spark) Spark event) to SparkListener
s (incl. Spark UI).
Note
Use SparkListener
(Apache Spark) to intercept SparkListenerDriverAccumUpdates
events.
Building Target Logical Query Plan for AddFiles¶
buildTargetPlanWithFiles(
deltaTxn: OptimisticTransaction,
files: Seq[AddFile]): LogicalPlan
buildTargetPlanWithFiles
creates a DataFrame to represent the given AddFiles to access the analyzed logical query plan. buildTargetPlanWithFiles
requests the given OptimisticTransaction for the DeltaLog to create a DataFrame (for the Snapshot and the given AddFiles).
In the end, buildTargetPlanWithFiles
creates a Project
logical operator with Alias
expressions so the output columns of the analyzed logical query plan (of the DataFrame
of the AddFiles
) reference the target's output columns (by name).
Note
The output columns of the target delta table are associated with a OptimisticTransaction as the Metadata.
deltaTxn.metadata.schema
Exceptions¶
run
throws an AnalysisException
when the target schema is different than the delta table's (has changed after analysis phase):
The schema of your Delta table has changed in an incompatible way since your DataFrame or DeltaTable object was created. Please redefine your DataFrame or DeltaTable object. Changes:
[schemaDiff]
This check can be turned off by setting the session configuration key spark.databricks.delta.checkLatestSchemaOnRead to false.
Writing Out Single Insert-Only Merge¶
writeInsertsOnlyWhenNoMatchedClauses(
spark: SparkSession,
deltaTxn: OptimisticTransaction): Seq[FileAction]
In the end, writeInsertsOnlyWhenNoMatchedClauses
returns the FileActions (from writing out data).
writeInsertsOnlyWhenNoMatchedClauses
is used when:
MergeIntoCommand
is executed (for single insert-only merge with spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled enabled)
Target Output Columns¶
writeInsertsOnlyWhenNoMatchedClauses
gets the names of the output (target) columns.
writeInsertsOnlyWhenNoMatchedClauses
creates a collection of output columns with the target names and the resolved DeltaMergeActions of a single DeltaMergeIntoInsertClause (as Alias
expressions).
Source DataFrame¶
writeInsertsOnlyWhenNoMatchedClauses
creates a UDF to update numSourceRows metric.
writeInsertsOnlyWhenNoMatchedClauses
creates a source DataFrame
for the source data with Dataset.filter
s with the UDF and the condition of the DeltaMergeIntoInsertClause (if defined) or Literal.TrueLiteral
.
Use condition for filter pushdown optimization
The condition of this single DeltaMergeIntoInsertClause is pushed down to the source when Spark SQL optimizes the query.
Data-Skipped AddFiles¶
writeInsertsOnlyWhenNoMatchedClauses
splits conjunctive predicates (And
expressions) in the merge condition and determines a so-called targetOnlyPredicates (predicates with the target columns only). writeInsertsOnlyWhenNoMatchedClauses
requests the given OptimisticTransaction to filterFiles matching the target-only predicates (that gives AddFiles).
Merge Condition and Data Skipping
The merge condition of this MergeIntoCommand is used for Data Skipping.
Target DataFrame¶
writeInsertsOnlyWhenNoMatchedClauses
creates a target DataFrame
for the data-skipped AddFile
s.
Insert DataFrame¶
writeInsertsOnlyWhenNoMatchedClauses
creates a UDF to update numTargetRowsInserted metric.
writeInsertsOnlyWhenNoMatchedClauses
left-anti joins the source DataFrame
with the target DataFrame
on the merge condition. writeInsertsOnlyWhenNoMatchedClauses
selects the output columns and uses Dataset.filter
with the UDF.
Demo: Left-Anti Join
val source = Seq(0, 1, 2, 3).toDF("id") // (1)!
val target = Seq(3, 4, 5).toDF("id") // (2)!
val usingColumns = Seq("id")
val q = source.join(target, usingColumns, "leftanti")
-
Equivalent to
spark.range(4)
+---+ | id| +---+ | 0| | 1| | 2| | 3| +---+
-
Equivalent to
spark.range(3, 6)
+---+ | id| +---+ | 3| | 4| | 5| +---+
scala> q.show
+---+
| id|
+---+
| 0|
| 1|
| 2|
+---+
writeFiles¶
writeInsertsOnlyWhenNoMatchedClauses
requests the given OptimisticTransaction to write out the insertDf (possibly repartitionIfNeeded on the partitionColumns of the target delta table).
Note
This step triggers a Spark write job (and this active transaction is marked as hasWritten).
Update Metrics¶
In the end, writeInsertsOnlyWhenNoMatchedClauses
updates the metrics.
Writing Out Merged Data¶
writeAllChanges(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
filesToRewrite: Seq[AddFile]): Seq[FileAction]
In the end, writeAllChanges
returns the FileActions (from writing out data).
writeAllChanges
is used when:
MergeIntoCommand
is executed (that is neither a single insert-only merge nor spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled configuration property is enabled for which writeInsertsOnlyWhenNoMatchedClauses is used instead)
targetOutputCols¶
writeAllChanges
builds the target output schema (possibly with some null
s for the target columns that are not in the current schema).
newTarget Query Plan¶
writeAllChanges
builds a target logical query plan for the given filesToRewrite
AddFiles.
Join Type¶
writeAllChanges
determines the join type to use:
rightOuter
for matched-only merge with spark.databricks.delta.merge.optimizeMatchedOnlyMerge.enabled configuration property enabledfullOuter
otherwise
writeAllChanges
prints out the following DEBUG message to the logs:
writeAllChanges using [joinType] join:
source.output: [outputSet]
target.output: [outputSet]
condition: [condition]
newTarget.output: [outputSet]
Source DataFrame¶
writeAllChanges
creates a source DataFrame
for the source query plan and an extra column:
_source_row_present_
with the value of an UDF that increments the numSourceRowsInSecondScan metric
New Target DataFrame¶
writeAllChanges
creates a target DataFrame
for the newTarget query plan and an extra column:
_target_row_present_
withtrue
literal
Joined DataFrame¶
writeAllChanges
creates a joined DataFrame
that is a Dataset.join
of the source and new target DataFrame
s with the given join condition and the join type.
Output Schema¶
writeAllChanges
creates an output schema based on cdcEnabled:
- FIXME
Create JoinedRowProcessor¶
writeAllChanges
creates a JoinedRowProcessor (that is then used to map over partitions of the joined DataFrame).
Output DataFrame¶
writeAllChanges
creates a DataFrame
for the joinedPlan
and uses Dataset.mapPartitions
operator to let the JoinedRowProcessor to processPartition.
writeAllChanges
drops row_dropped and incr_row_count columns.
writeAllChanges
prints out the following DEBUG message to the logs:
writeAllChanges: join output plan:
[outputDF.queryExecution]
writeFiles¶
writeAllChanges
requests the given OptimisticTransaction to write out the insertDf (possibly repartitioning if needed on the partition columns).
Note
This step triggers a Spark write job (and this active transaction is marked as hasWritten).
Update Metrics¶
In the end, writeAllChanges
updates the metrics.
Single Insert-Only Merge¶
isSingleInsertOnly: Boolean
isSingleInsertOnly
holds true when this MERGE
command is a single WHEN NOT MATCHED THEN INSERT
:
- No MATCHED clauses
- There is just a single notMatchedClauses
MERGE INTO merge_demo to
USING merge_demo_source from
ON to.id = from.id
WHEN NOT MATCHED THEN INSERT *;
Matched-Only Merge¶
isMatchedOnly: Boolean
isMatchedOnly
holds true when this MERGE
command is WHEN MATCHED THEN
-only:
- No NOT MATCHED clauses
- MATCHED clauses only
MERGE INTO merge_demo to
USING merge_demo_source from
ON to.id = from.id
WHEN MATCHED AND to.id < 3 THEN DELETE
WHEN MATCHED THEN UPDATE SET *;
Creating Metric Update UDF¶
makeMetricUpdateUDF(
name: String): Expression
makeMetricUpdateUDF
looks up the performance metric (by name
) in the metrics.
In the end, makeMetricUpdateUDF
defines a non-deterministic UDF to increment the metric (when executed).
Note
This Expression
is used to increment the following performance metrics:
makeMetricUpdateUDF
is used when:
MergeIntoCommand
is requested to findTouchedFiles, writeInsertsOnlyWhenNoMatchedClauses, writeAllChanges
notMatchedClauseOutput¶
notMatchedClauseOutput(
clause: DeltaMergeIntoInsertClause): Seq[Seq[Expression]]
notMatchedClauseOutput
returns the main data output followed by the CDF data output when cdcEnabled:
(mainDataOutput)
or
(mainDataOutput, insertCdcOutput)
notMatchedClauseOutput
creates the output (resolved expressions) of the main data based on the following:
- The
Expression
s from the DeltaMergeActions of the given DeltaMergeIntoInsertClause FalseLiteral
- The UDF to increment the numTargetRowsInserted metric
- null literal (to indicate the main data not CDF's)
With cdcEnabled, notMatchedClauseOutput
creates the output (resolved expressions) of the CDF data based on the following:
- The
Expression
s from the DeltaMergeActions of the given DeltaMergeIntoInsertClause FalseLiteral
TrueLiteral
- insert literal
Note
The first two expressions are the same in the main data and CDF data outputs.
notMatchedClauseOutput
is used when:
MergeIntoCommand
is requested to write out merged data (for notMatchedOutputs to create a JoinedRowProcessor based on the non-matched insert clauses)
Finding Files to Rewrite¶
findTouchedFiles(
spark: SparkSession,
deltaTxn: OptimisticTransaction): Seq[AddFile]
Important
findTouchedFiles
is such a fine piece of art (a Delta gem). It uses a custom accumulator, a UDF (to use this accumulator to record touched file names) and input_file_name()
standard function for the names of the files read.
It is always worth keeping in mind that Delta Lake uses files for data storage and that is why input_file_name()
standard function works. It would not work for non-file-based data sources.
Example 1: Understanding the Internals of findTouchedFiles
The following query writes out a 10-element dataset using the default parquet data source to /tmp/parquet
directory:
val target = "/tmp/parquet"
spark.range(10).write.save(target)
The number of parquet part files varies based on the number of partitions (CPU cores).
The following query loads the parquet dataset back alongside input_file_name()
standard function to mimic findTouchedFiles
's behaviour.
val FILE_NAME_COL = "_file_name_"
val dataFiles = spark.read.parquet(target).withColumn(FILE_NAME_COL, input_file_name())
scala> dataFiles.show(truncate = false)
+---+---------------------------------------------------------------------------------------+
|id |_file_name_ |
+---+---------------------------------------------------------------------------------------+
|4 |file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|0 |file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|3 |file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|6 |file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|1 |file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|8 |file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|2 |file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|7 |file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|5 |file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
|9 |file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
+---+---------------------------------------------------------------------------------------+
As you may have thought, not all part files have got data and so they are not included in the dataset. That is when findTouchedFiles
uses groupBy
operator and count
action to calculate match frequency.
val counts = dataFiles.groupBy(FILE_NAME_COL).count()
scala> counts.show(truncate = false)
+---------------------------------------------------------------------------------------+-----+
|_file_name_ |count|
+---------------------------------------------------------------------------------------+-----+
|file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
+---------------------------------------------------------------------------------------+-----+
Let's load all the part files in the /tmp/parquet
directory and find which file(s) have no data.
import scala.sys.process._
val cmd = (s"ls $target" #| "grep .parquet").lineStream
val allFiles = cmd.toArray.toSeq.toDF(FILE_NAME_COL)
.select(concat(lit(s"file://$target/"), col(FILE_NAME_COL)) as FILE_NAME_COL)
val joinType = "left_anti" // MergeIntoCommand uses inner as it wants data file
val noDataFiles = allFiles.join(dataFiles, Seq(FILE_NAME_COL), joinType)
Mind that the data vs non-data datasets could be different, but that should not "interfere" with the main reasoning flow.
scala> noDataFiles.show(truncate = false)
+---------------------------------------------------------------------------------------+
|_file_name_ |
+---------------------------------------------------------------------------------------+
|file:///tmp/parquet/part-00000-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|
+---------------------------------------------------------------------------------------+
findTouchedFiles
registers an accumulator to collect all the distinct files that need to be rewritten (touched files).
Note
The name of the accumulator is internal.metrics.MergeIntoDelta.touchedFiles and internal.metrics
part is supposed to hide it from web UI as potentially large (set of file names to be rewritten).
recordTouchedFileName¶
findTouchedFiles
defines a nondeterministic UDF that adds the file names to the accumulator (recordTouchedFileName).
dataSkippedFiles¶
findTouchedFiles
splits conjunctive predicates (And
binary expressions) in the condition expression and collects the predicates that use the target's columns (targetOnlyPredicates). findTouchedFiles
requests the given OptimisticTransaction for the files that match the target-only predicates (and creates a dataSkippedFiles
collection of AddFiles).
Note
This step looks similar to filter predicate pushdown.
findTouchedFiles
creates one DataFrame
for the source data (using Spark SQL utility).
findTouchedFiles
builds a logical query plan for the files (matching the predicates) and creates another DataFrame
for the target data. findTouchedFiles
adds two columns to the target dataframe:
_row_id_
formonotonically_increasing_id()
standard function_file_name_
forinput_file_name()
standard function
findTouchedFiles
creates (a DataFrame
that is) an INNER JOIN of the source and target DataFrame
s using the condition expression.
findTouchedFiles
takes the joined dataframe and selects _row_id_
column and the recordTouchedFileName UDF on the _file_name_
column as one
. The DataFrame
is internally known as collectTouchedFiles
.
findTouchedFiles
uses groupBy
operator on _row_id_
to calculate a sum of all the values in the one
column (as count
column) in the two-column collectTouchedFiles
dataset. The DataFrame
is internally known as matchedRowCounts
.
Note
No Spark job has been submitted yet. findTouchedFiles
is still in "query preparation" mode.
findTouchedFiles
uses filter
on the count
column (in the matchedRowCounts
dataset) with values greater than 1
. If there are any, findTouchedFiles
throws an UnsupportedOperationException
exception:
Cannot perform MERGE as multiple source rows matched and attempted to update the same
target row in the Delta table. By SQL semantics of merge, when multiple source rows match
on the same target row, the update operation is ambiguous as it is unclear which source
should be used to update the matching target row.
You can preprocess the source table to eliminate the possibility of multiple matches.
Note
Since findTouchedFiles
uses count
action there should be a Spark SQL query reported (and possibly Spark jobs) in web UI.
findTouchedFiles
requests the touchedFilesAccum
accumulator for the touched file names.
Example 2: Understanding the Internals of findTouchedFiles
val TOUCHED_FILES_ACCUM_NAME = "MergeIntoDelta.touchedFiles"
val touchedFilesAccum = spark.sparkContext.collectionAccumulator[String](TOUCHED_FILES_ACCUM_NAME)
val recordTouchedFileName = udf { (fileName: String) => {
touchedFilesAccum.add(fileName)
1
}}.asNondeterministic()
val target = "/tmp/parquet"
spark.range(10).write.save(target)
val FILE_NAME_COL = "_file_name_"
val dataFiles = spark.read.parquet(target).withColumn(FILE_NAME_COL, input_file_name())
val collectTouchedFiles = dataFiles.select(col(FILE_NAME_COL), recordTouchedFileName(col(FILE_NAME_COL)).as("one"))
scala> collectTouchedFiles.show(truncate = false)
+---------------------------------------------------------------------------------------+---+
|_file_name_ |one|
+---------------------------------------------------------------------------------------+---+
|file:///tmp/parquet/part-00007-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00001-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00006-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00011-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00003-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00014-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00004-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00012-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00009-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
|file:///tmp/parquet/part-00015-76df546f-91f8-4cbb-8fcc-f51478e0db31-c000.snappy.parquet|1 |
+---------------------------------------------------------------------------------------+---+
import scala.collection.JavaConverters._
val touchedFileNames = touchedFilesAccum.value.asScala.toSeq
Use the Stages tab in web UI to review the accumulator values.
findTouchedFiles
prints out the following TRACE message to the logs:
findTouchedFiles: matched files:
[touchedFileNames]
findTouchedFiles
generateCandidateFileMap for the files that match the target-only predicates.
findTouchedFiles
getTouchedFile for every touched file name.
findTouchedFiles
updates the following performance metrics:
- numTargetFilesBeforeSkipping and adds the numOfFiles of the Snapshot of the given OptimisticTransaction
- numTargetFilesAfterSkipping and adds the number of the files that match the target-only predicates
- numTargetFilesRemoved and adds the number of the touched files
In the end, findTouchedFiles
gives the touched files (as AddFiles).
repartitionIfNeeded¶
repartitionIfNeeded(
spark: SparkSession,
df: DataFrame,
partitionColumns: Seq[String]): DataFrame
repartitionIfNeeded
repartitions the given DataFrame
by the partitionColumns
(using Dataset.repartition
operation) when all the following hold:
- There is at least one partition column (among the given
partitionColumns
) - spark.databricks.delta.merge.repartitionBeforeWrite.enabled configuration property is
true
repartitionIfNeeded
is used when:
MergeIntoCommand
is executed (and writes data out for Single Insert-Only Merge and other merges)
LeafRunnableCommand¶
MergeIntoCommand
is a LeafRunnableCommand
(Spark SQL) logical operator.
Demo¶
Logging¶
Enable ALL
logging level for org.apache.spark.sql.delta.commands.MergeIntoCommand
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.delta.commands.MergeIntoCommand=ALL
Refer to Logging.