Skip to content

OptimizeExecutor

OptimizeExecutor is a DeltaCommand for OptimizeTableCommand to optimize a delta table.

OptimizeExecutor is a SQLMetricsReporting.

Creating Instance

OptimizeExecutor takes the following to be created:

OptimizeExecutor is created when:

Curve

curve: String

curve can be one of the two supported values:


curve is used when:

Performing Optimization

optimize(): Seq[Row]

optimize is used when:


optimize reads the following configuration properties:

optimize requests the DeltaLog to startTransaction.

optimize requests the OptimisticTransaction for the files matching the partition predicates.

optimize finds the files of the size below the spark.databricks.delta.optimize.minFileSize threshold (that are the files considered for compacting) and groups them by partition values.

optimize group the files into bins (of the spark.databricks.delta.optimize.maxFileSize size).

Note

A bin is a group of files, whose total size does not exceed the desired size. They will be coalesced into a single output file.

optimize creates a ForkJoinPool with spark.databricks.delta.optimize.maxThreads threads (with the OptimizeJob thread prefix). The task pool is then used to parallelize the submission of runCompactBinJob optimization jobs to Spark.

Once the compaction jobs are done, optimize tries to commit the transaction (the given actions to the log) when there were any AddFiles.

In the end, optimize returns a Row with the data path (of the Delta table) and the optimize statistics.

groupFilesIntoBins

groupFilesIntoBins(
  partitionsToCompact: Seq[(Map[String, String], Seq[AddFile])],
  maxTargetFileSize: Long): Seq[(Map[String, String], Seq[AddFile])]

groupFilesIntoBins...FIXME

pruneCandidateFileList

pruneCandidateFileList(
  minFileSize: Long,
  maxDeletedRowsRatio: Double,
  files: Seq[AddFile]): Seq[AddFile]

pruneCandidateFileList...FIXME

runOptimizeBinJob

runOptimizeBinJob(
  txn: OptimisticTransaction,
  partition: Map[String, String],
  bin: Seq[AddFile],
  maxFileSize: Long): Seq[FileAction]

maxFileSize

maxFileSize is controlled using spark.databricks.delta.optimize.maxFileSize configuration property.

Unless it is executed as part of Auto Compaction which uses spark.databricks.delta.autoCompact.maxFileSize configuration property.

runOptimizeBinJob creates an input DataFrame for scanning data of the given AddFiles. runOptimizeBinJob requests the DeltaLog (of the given OptimisticTransaction) to create the DataFrame (with Optimize action type).

runOptimizeBinJob creates a so-called repartitionDF as follows:

runOptimizeBinJob sets a custom description for the job group (for all future Spark jobs started by this thread).

runOptimizeBinJob writes out the repartitioned DataFrame. runOptimizeBinJob requests the given OptimisticTransaction to write data out.

runOptimizeBinJob marks all the AddFiles (the result of writting data out) as not dataChange.

IllegalStateException for non-AddFiles

No other FileActions are expected or runOptimizeBinJob throws an IllegalStateException:

Unexpected action [other] with type [class].
File compaction job output should only have AddFiles

runOptimizeBinJob creates RemoveFiles for all the given AddFiles. runOptimizeBinJob uses the current timestamp and the dataChange flag is disabled (as was earlier with the AddFiles).

In the end, runOptimizeBinJob returns the AddFiles and RemoveFiles.

commitAndRetry

commitAndRetry(
  txn: OptimisticTransaction,
  optimizeOperation: Operation,
  actions: Seq[Action],
  metrics: Map[String, SQLMetric])(f: OptimisticTransaction => Boolean): Unit

commitAndRetry...FIXME

createMetrics

createMetrics(
  sparkContext: SparkContext,
  addedFiles: Seq[AddFile],
  removedFiles: Seq[RemoveFile],
  removedDVs: Seq[DeletionVectorDescriptor]): Map[String, SQLMetric]

createMetrics...FIXME

isMultiDimClustering Flag

OptimizeExecutor defines isMultiDimClustering flag that is enabled (true) when either holds:

The most important use of isMultiDimClustering flag is for multi-dimensional clustering while runOptimizeBinJob.

OptimizeExecutor uses it also for the following:

  • Determine data (parquet) files to optimize (that in fact keeps all the files by partition)
  • Create a ZOrderStats at the end of optimize
  • Keep all the files of a partition in a single bin in groupFilesIntoBins

isClusteredTable Flag

OptimizeExecutor defines isClusteredTable flag that is enabled (true) when clustered tables are supported (based on the Protocol of the table snapshot under the OptimisticTransaction).

isClusteredTable is used in the following: