OptimizeExecutor¶
OptimizeExecutor is a DeltaCommand for OptimizeTableCommand to optimize a delta table.
OptimizeExecutor is a SQLMetricsReporting.
Creating Instance¶
OptimizeExecutor takes the following to be created:
-
SparkSession(Spark SQL) - OptimisticTransaction
- Partition predicate expressions (Spark SQL)
- Z-OrderBy Column Names
-
isAutoCompactflag - DeltaOptimizeContext
OptimizeExecutor is created when:
Curve¶
curve: String
curve can be one of the two supported values:
zorderfor one or more zOrderByColumnshilbertfor no zOrderByColumns and clustered tables feature enabled
curve is used when:
OptimizeExecutoris requested to runOptimizeBinJob
Performing Optimization¶
optimize(): Seq[Row]
optimize is used when:
OptimizeTableCommandis executed
optimize reads the following configuration properties:
- spark.databricks.delta.optimize.minFileSize for the threshold of the size of files to be grouped together and rewritten as larger ones
- spark.databricks.delta.optimize.maxFileSize for the maximum desired file size (in bytes) of the compaction output files
optimize requests the DeltaLog to startTransaction.
optimize requests the OptimisticTransaction for the files matching the partition predicates.
optimize finds the files of the size below the spark.databricks.delta.optimize.minFileSize threshold (that are the files considered for compacting) and groups them by partition values.
optimize group the files into bins (of the spark.databricks.delta.optimize.maxFileSize size).
Note
A bin is a group of files, whose total size does not exceed the desired size. They will be coalesced into a single output file.
optimize creates a ForkJoinPool with spark.databricks.delta.optimize.maxThreads threads (with the OptimizeJob thread prefix). The task pool is then used to parallelize the submission of runCompactBinJob optimization jobs to Spark.
Once the compaction jobs are done, optimize tries to commit the transaction (the given actions to the log) when there were any AddFiles.
In the end, optimize returns a Row with the data path (of the Delta table) and the optimize statistics.
groupFilesIntoBins¶
groupFilesIntoBins(
partitionsToCompact: Seq[(Map[String, String], Seq[AddFile])],
maxTargetFileSize: Long): Seq[(Map[String, String], Seq[AddFile])]
groupFilesIntoBins...FIXME
pruneCandidateFileList¶
pruneCandidateFileList(
minFileSize: Long,
maxDeletedRowsRatio: Double,
files: Seq[AddFile]): Seq[AddFile]
pruneCandidateFileList...FIXME
runOptimizeBinJob¶
runOptimizeBinJob(
txn: OptimisticTransaction,
partition: Map[String, String],
bin: Seq[AddFile],
maxFileSize: Long): Seq[FileAction]
maxFileSize
maxFileSize is controlled using spark.databricks.delta.optimize.maxFileSize configuration property.
Unless it is executed as part of Auto Compaction which uses spark.databricks.delta.autoCompact.maxFileSize configuration property.
runOptimizeBinJob creates an input DataFrame for scanning data of the given AddFiles. runOptimizeBinJob requests the DeltaLog (of the given OptimisticTransaction) to create the DataFrame (with Optimize action type).
runOptimizeBinJob creates a so-called repartitionDF as follows:
-
With multi-dimensional clustering enabled (i.e., Z-Order or Liquid Clustering),
runOptimizeBinJobdoes the following:- Calculates the approximate number of files (as the total size of all the given AddFiles divided by the given
maxFileSize) - Repartitions the
DataFrameto as many partitions as the approximate number of files using multi-dimensional clustering (with theDataFrameto scan, the approximate number of files, the clusteringColumns and the curve)
- Calculates the approximate number of files (as the total size of all the given AddFiles divided by the given
-
Otherwise,
runOptimizeBinJobrepartitions theDataFrameto a one single partition using the followingDataFrameoperators based on spark.databricks.delta.optimize.repartition.enabled configuration property:DataFrame.repartitionwith spark.databricks.delta.optimize.repartition.enabled enabledDataFrame.coalesce, otherwise
runOptimizeBinJob sets a custom description for the job group (for all future Spark jobs started by this thread).
runOptimizeBinJob writes out the repartitioned DataFrame. runOptimizeBinJob requests the given OptimisticTransaction to write data out.
runOptimizeBinJob marks all the AddFiles (the result of writting data out) as not dataChange.
IllegalStateException for non-AddFiles
No other FileActions are expected or runOptimizeBinJob throws an IllegalStateException:
Unexpected action [other] with type [class].
File compaction job output should only have AddFiles
runOptimizeBinJob creates RemoveFiles for all the given AddFiles. runOptimizeBinJob uses the current timestamp and the dataChange flag is disabled (as was earlier with the AddFiles).
In the end, runOptimizeBinJob returns the AddFiles and RemoveFiles.
commitAndRetry¶
commitAndRetry(
txn: OptimisticTransaction,
optimizeOperation: Operation,
actions: Seq[Action],
metrics: Map[String, SQLMetric])(f: OptimisticTransaction => Boolean): Unit
commitAndRetry...FIXME
createMetrics¶
createMetrics(
sparkContext: SparkContext,
addedFiles: Seq[AddFile],
removedFiles: Seq[RemoveFile],
removedDVs: Seq[DeletionVectorDescriptor]): Map[String, SQLMetric]
createMetrics...FIXME
isMultiDimClustering Flag¶
OptimizeExecutor defines isMultiDimClustering flag that is enabled (true) when either holds:
- Clustered Tables are supported
- ZORDER BY Columns are specified
The most important use of isMultiDimClustering flag is for multi-dimensional clustering while runOptimizeBinJob.
OptimizeExecutor uses it also for the following:
- Determine data (parquet) files to optimize (that in fact keeps all the files by partition)
- Create a
ZOrderStatsat the end of optimize - Keep all the files of a partition in a single bin in groupFilesIntoBins
isClusteredTable Flag¶
OptimizeExecutor defines isClusteredTable flag that is enabled (true) when clustered tables are supported (based on the Protocol of the table snapshot under the OptimisticTransaction).
isClusteredTable is used in the following: