OptimizeExecutor¶
OptimizeExecutor
is a DeltaCommand for OptimizeTableCommand to optimize a delta table.
OptimizeExecutor
is a SQLMetricsReporting.
Creating Instance¶
OptimizeExecutor
takes the following to be created:
-
SparkSession
(Spark SQL) - OptimisticTransaction
- Partition predicate expressions (Spark SQL)
- Z-OrderBy Column Names
-
isAutoCompact
flag - DeltaOptimizeContext
OptimizeExecutor
is created when:
Curve¶
curve: String
curve
can be one of the two supported values:
zorder
for one or more zOrderByColumnshilbert
for no zOrderByColumns and clustered tables feature enabled
curve
is used when:
OptimizeExecutor
is requested to runOptimizeBinJob
Performing Optimization¶
optimize(): Seq[Row]
optimize
is used when:
OptimizeTableCommand
is executed
optimize
reads the following configuration properties:
- spark.databricks.delta.optimize.minFileSize for the threshold of the size of files to be grouped together and rewritten as larger ones
- spark.databricks.delta.optimize.maxFileSize for the maximum desired file size (in bytes) of the compaction output files
optimize
requests the DeltaLog to startTransaction.
optimize
requests the OptimisticTransaction
for the files matching the partition predicates.
optimize
finds the files of the size below the spark.databricks.delta.optimize.minFileSize threshold (that are the files considered for compacting) and groups them by partition values.
optimize
group the files into bins (of the spark.databricks.delta.optimize.maxFileSize
size).
Note
A bin is a group of files, whose total size does not exceed the desired size. They will be coalesced into a single output file.
optimize
creates a ForkJoinPool
with spark.databricks.delta.optimize.maxThreads threads (with the OptimizeJob
thread prefix). The task pool is then used to parallelize the submission of runCompactBinJob optimization jobs to Spark.
Once the compaction jobs are done, optimize
tries to commit the transaction (the given actions to the log) when there were any AddFiles.
In the end, optimize
returns a Row
with the data path (of the Delta table) and the optimize statistics.
groupFilesIntoBins¶
groupFilesIntoBins(
partitionsToCompact: Seq[(Map[String, String], Seq[AddFile])],
maxTargetFileSize: Long): Seq[(Map[String, String], Seq[AddFile])]
groupFilesIntoBins
...FIXME
pruneCandidateFileList¶
pruneCandidateFileList(
minFileSize: Long,
maxDeletedRowsRatio: Double,
files: Seq[AddFile]): Seq[AddFile]
pruneCandidateFileList
...FIXME
runOptimizeBinJob¶
runOptimizeBinJob(
txn: OptimisticTransaction,
partition: Map[String, String],
bin: Seq[AddFile],
maxFileSize: Long): Seq[FileAction]
maxFileSize
maxFileSize
is controlled using spark.databricks.delta.optimize.maxFileSize configuration property.
Unless it is executed as part of Auto Compaction which uses spark.databricks.delta.autoCompact.maxFileSize configuration property.
runOptimizeBinJob
creates an input DataFrame
for scanning data of the given AddFiles. runOptimizeBinJob
requests the DeltaLog (of the given OptimisticTransaction) to create the DataFrame (with Optimize
action type).
runOptimizeBinJob
creates a so-called repartitionDF
as follows:
-
With multi-dimensional clustering enabled (i.e., Z-Order or Liquid Clustering),
runOptimizeBinJob
does the following:- Calculates the approximate number of files (as the total size of all the given AddFiles divided by the given
maxFileSize
) - Repartitions the
DataFrame
to as many partitions as the approximate number of files using multi-dimensional clustering (with theDataFrame
to scan, the approximate number of files, the clusteringColumns and the curve)
- Calculates the approximate number of files (as the total size of all the given AddFiles divided by the given
-
Otherwise,
runOptimizeBinJob
repartitions theDataFrame
to a one single partition using the followingDataFrame
operators based on spark.databricks.delta.optimize.repartition.enabled configuration property:DataFrame.repartition
with spark.databricks.delta.optimize.repartition.enabled enabledDataFrame.coalesce
, otherwise
runOptimizeBinJob
sets a custom description for the job group (for all future Spark jobs started by this thread).
runOptimizeBinJob
writes out the repartitioned DataFrame
. runOptimizeBinJob
requests the given OptimisticTransaction to write data out.
runOptimizeBinJob
marks all the AddFiles (the result of writting data out) as not dataChange.
IllegalStateException for non-AddFile
s
No other FileActions are expected or runOptimizeBinJob
throws an IllegalStateException
:
Unexpected action [other] with type [class].
File compaction job output should only have AddFiles
runOptimizeBinJob
creates RemoveFiles for all the given AddFiles. runOptimizeBinJob
uses the current timestamp and the dataChange
flag is disabled (as was earlier with the AddFile
s).
In the end, runOptimizeBinJob
returns the AddFiles and RemoveFiles.
commitAndRetry¶
commitAndRetry(
txn: OptimisticTransaction,
optimizeOperation: Operation,
actions: Seq[Action],
metrics: Map[String, SQLMetric])(f: OptimisticTransaction => Boolean): Unit
commitAndRetry
...FIXME
createMetrics¶
createMetrics(
sparkContext: SparkContext,
addedFiles: Seq[AddFile],
removedFiles: Seq[RemoveFile],
removedDVs: Seq[DeletionVectorDescriptor]): Map[String, SQLMetric]
createMetrics
...FIXME
isMultiDimClustering Flag¶
OptimizeExecutor
defines isMultiDimClustering
flag that is enabled (true
) when either holds:
- Clustered Tables are supported
- ZORDER BY Columns are specified
The most important use of isMultiDimClustering
flag is for multi-dimensional clustering while runOptimizeBinJob.
OptimizeExecutor
uses it also for the following:
- Determine data (parquet) files to optimize (that in fact keeps all the files by partition)
- Create a
ZOrderStats
at the end of optimize - Keep all the files of a partition in a single bin in groupFilesIntoBins
isClusteredTable Flag¶
OptimizeExecutor
defines isClusteredTable
flag that is enabled (true
) when clustered tables are supported (based on the Protocol of the table snapshot under the OptimisticTransaction).
isClusteredTable
is used in the following: