Demo: Optimize¶
This demo shows OPTIMIZE command in action.
Create Delta Table¶
sql("DROP TABLE IF EXISTS nums")
spark.range(10e4.toLong)
.repartitionByRange(3, $"id" % 10)
.writeTo("nums")
.using("delta")
.create
Let's review the on-disk table representation.
tree -s spark-warehouse/nums
[ 288] spark-warehouse/nums
├── [ 128] _delta_log
│ └── [ 1624] 00000000000000000000.json
├── [ 161036] part-00000-9d2b6a10-d00e-4cdb-aa24-84ff7346bf08-c000.snappy.parquet
├── [ 121265] part-00001-cb9456b6-037d-469f-b0d1-c384064beadd-c000.snappy.parquet
└── [ 121037] part-00002-4aa15e4b-9db4-4a4a-9f97-dd4f4396792b-c000.snappy.parquet
2 directories, 4 files
Optimize Table¶
val optimizeMetrics = sql("OPTIMIZE nums")
scala> optimizeMetrics.printSchema
root
|-- path: string (nullable = true)
|-- metrics: struct (nullable = true)
| |-- numFilesAdded: long (nullable = false)
| |-- numFilesRemoved: long (nullable = false)
| |-- filesAdded: struct (nullable = true)
| | |-- min: long (nullable = true)
| | |-- max: long (nullable = true)
| | |-- avg: double (nullable = false)
| | |-- totalFiles: long (nullable = false)
| | |-- totalSize: long (nullable = false)
| |-- filesRemoved: struct (nullable = true)
| | |-- min: long (nullable = true)
| | |-- max: long (nullable = true)
| | |-- avg: double (nullable = false)
| | |-- totalFiles: long (nullable = false)
| | |-- totalSize: long (nullable = false)
| |-- partitionsOptimized: long (nullable = false)
| |-- zOrderStats: struct (nullable = true)
| | |-- strategyName: string (nullable = true)
| | |-- inputCubeFiles: struct (nullable = true)
| | | |-- num: long (nullable = false)
| | | |-- size: long (nullable = false)
| | |-- inputOtherFiles: struct (nullable = true)
| | | |-- num: long (nullable = false)
| | | |-- size: long (nullable = false)
| | |-- inputNumCubes: long (nullable = false)
| | |-- mergedFiles: struct (nullable = true)
| | | |-- num: long (nullable = false)
| | | |-- size: long (nullable = false)
| | |-- numOutputCubes: long (nullable = false)
| | |-- mergedNumCubes: long (nullable = true)
| |-- numBatches: long (nullable = false)
| |-- totalConsideredFiles: long (nullable = false)
| |-- totalFilesSkipped: long (nullable = false)
| |-- preserveInsertionOrder: boolean (nullable = false)
| |-- numFilesSkippedToReduceWriteAmplification: long (nullable = false)
| |-- numBytesSkippedToReduceWriteAmplification: long (nullable = false)
| |-- startTimeMs: long (nullable = false)
| |-- endTimeMs: long (nullable = false)
| |-- totalClusterParallelism: long (nullable = false)
| |-- totalScheduledTasks: long (nullable = false)
| |-- autoCompactParallelismStats: struct (nullable = true)
| | |-- maxClusterActiveParallelism: long (nullable = true)
| | |-- minClusterActiveParallelism: long (nullable = true)
| | |-- maxSessionActiveParallelism: long (nullable = true)
| | |-- minSessionActiveParallelism: long (nullable = true)
optimizeMetrics.show(truncate = false)
+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------------+
|path |metrics |
+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------------+
|file:/tmp/numbers|{1, 3, {402620, 402620, 402620.0, 1, 402620}, {121037, 161036, 134446.0, 3, 403338}, 1, null, 1, 3, 0, false, 0, 0, 1678104379055, 0, 16, 0, null}|
+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------------+
Let's review the on-disk table representation (after OPTIMIZE
).
tree -s spark-warehouse/nums
[ 352] spark-warehouse/nums
├── [ 192] _delta_log
│ ├── [ 1624] 00000000000000000000.json
│ └── [ 1431] 00000000000000000001.json
├── [ 402620] part-00000-86af4440-5374-4c18-ae6f-07b0d3680e27-c000.snappy.parquet
├── [ 161036] part-00000-9d2b6a10-d00e-4cdb-aa24-84ff7346bf08-c000.snappy.parquet
├── [ 121265] part-00001-cb9456b6-037d-469f-b0d1-c384064beadd-c000.snappy.parquet
└── [ 121037] part-00002-4aa15e4b-9db4-4a4a-9f97-dd4f4396792b-c000.snappy.parquet
2 directories, 6 files
Note one extra file (part-00000-86af4440-5374-4c18-ae6f-07b0d3680e27-c000.snappy.parquet
) in the file listing.
Use DeltaLog to Review Log Files
You can review the transaction log (i.e., the00000000000000000001.json
commit file in particular) or use DeltaLog.
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.catalyst.TableIdentifier
val log = DeltaLog.forTable(spark, tableName = TableIdentifier("nums"))
log.getSnapshotAt(1).allFiles.select('path).show(truncate = false)
+-------------------------------------------------------------------+
|path |
+-------------------------------------------------------------------+
|part-00000-86af4440-5374-4c18-ae6f-07b0d3680e27-c000.snappy.parquet|
+-------------------------------------------------------------------+
Review History¶
OPTIMIZE is transactional and creates a new version.
val history = sql("desc history nums")
.select(
"version",
"operation",
"operationParameters",
"readVersion",
"isolationLevel",
"isBlindAppend",
"operationMetrics")
history.show(truncate = false)
+-------+----------------------+-----------------------------------------------------------------------------+-----------+-----------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation |operationParameters |readVersion|isolationLevel |isBlindAppend|operationMetrics |
+-------+----------------------+-----------------------------------------------------------------------------+-----------+-----------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |OPTIMIZE |{predicate -> [], zOrderBy -> []} |0 |SnapshotIsolation|false |{numRemovedFiles -> 3, numRemovedBytes -> 403338, p25FileSize -> 402620, minFileSize -> 402620, numAddedFiles -> 1, maxFileSize -> 402620, p75FileSize -> 402620, p50FileSize -> 402620, numAddedBytes -> 402620}|
|0 |CREATE TABLE AS SELECT|{isManaged -> true, description -> null, partitionBy -> [], properties -> {}}|null |Serializable |true |{numFiles -> 3, numOutputRows -> 100000, numOutputBytes -> 403338} |
+-------+----------------------+-----------------------------------------------------------------------------+-----------+-----------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+