Skip to content

OptimizeExecutor

OptimizeExecutor is a DeltaCommand with a SQLMetricsReporting.

Creating Instance

OptimizeExecutor takes the following to be created:

  • SparkSession (Spark SQL)
  • DeltaLog (of the Delta table to be optimized)
  • Partition predicate expressions (Spark SQL)
  • Z-OrderBy Columns (Names)

OptimizeExecutor is created when:

  • OptimizeTableCommand is requested to run

optimize

optimize(): Seq[Row]

optimize is used when:

  • OptimizeTableCommand is requested to run

optimize reads the following configuration properties:

optimize requests the DeltaLog to startTransaction.

optimize requests the OptimisticTransaction for the files matching the partition predicates.

optimize finds the files of the size below the spark.databricks.delta.optimize.minFileSize threshold (that are the files considered for compacting) and groups them by partition values.

optimize group the files into bins (of the spark.databricks.delta.optimize.maxFileSize size).

Note

A bin is a group of files, whose total size does not exceed the desired size. They will be coalesced into a single output file.

optimize creates a ForkJoinPool with spark.databricks.delta.optimize.maxThreads threads (with the OptimizeJob thread prefix). The task pool is then used to parallelize the submission of runCompactBinJob optimization jobs to Spark.

Once the compaction jobs are done, optimize tries to commit the transaction (the given actions to the log) when there were any AddFiles.

In the end, optimize returns a Row with the data path (of the Delta table) and the optimize statistics.

runOptimizeBinJob

runOptimizeBinJob(
  txn: OptimisticTransaction,
  partition: Map[String, String],
  bin: Seq[AddFile],
  maxFileSize: Long): Seq[FileAction]

runOptimizeBinJob creates an input DataFrame to represent data described by the given AddFiles. runOptimizeBinJob requests the deltaLog (of the given OptimisticTransaction) to create the DataFrame with Optimize action type.

For Z-Ordering (isMultiDimClustering flag is enabled), runOptimizeBinJob does the following:

  1. Calculates the approximate number of files (as the total size of all the given AddFiles divided by the given maxFileSize)
  2. Repartitions the DataFrame to as many partitions as the approximate number of files using multi-dimensional clustering for the z-orderby columns

Otherwise, runOptimizeBinJob coalesces the DataFrame to 1 partition (using DataFrame.coalesce operator).

runOptimizeBinJob sets a custom description for the job group (for all future Spark jobs started by this thread).

runOptimizeBinJob writes out the repartitioned DataFrame. runOptimizeBinJob requests the given OptimisticTransaction to writeFiles.

runOptimizeBinJob marks all the AddFiles (as the result of writeFiles) as not dataChange. No other FileActions are expected or runOptimizeBinJob throws an IllegalStateException:

Unexpected action [other] with type [class].
File compaction job output should only have AddFiles

runOptimizeBinJob creates RemoveFiles for all the given AddFiles. runOptimizeBinJob uses the current timestamp and the dataChange flag is disabled (as was earlier with the AddFiles).

In the end, runOptimizeBinJob returns the AddFiles and RemoveFiles.

commitAndRetry

commitAndRetry(
  txn: OptimisticTransaction,
  optimizeOperation: Operation,
  actions: Seq[Action],
  metrics: Map[String, SQLMetric])(f: OptimisticTransaction => Boolean): Unit

commitAndRetry...FIXME

isMultiDimClustering Flag

OptimizeExecutor defines isMultiDimClustering flag based on whether there are zOrderByColumns specified or not. In other words, isMultiDimClustering is true for OPTIMIZE ZORDER.

The most important use of isMultiDimClustering flag is for multi-dimensional clustering while runOptimizeBinJob.

OptimizeExecutor uses it also for the following:

  • Determine data (parquet) files to optimize (that in fact keeps all the files by partition)
  • Create a ZOrderStats at the end of optimize
  • Keep all the files of a partition in a single bin in groupFilesIntoBins