Skip to content

Demo: Data Skipping

This demo shows Data Skipping in action.

Logging

Enable logging for PrepareDeltaScan and the others used in data skipping.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.delta.stats=ALL

Start Spark Shell

./bin/spark-shell \
  --packages io.delta:delta-core_2.12:2.1.0rc1 \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.sources.DeltaSQLConf
assert(spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING), "Data skipping should be enabled")

Create Delta Table

val tableName = "d01"
sql(s"DROP TABLE IF EXISTS $tableName")
spark.range(5).writeTo(tableName).using("delta").create

Show Column Statistics

import org.apache.spark.sql.delta._
import org.apache.spark.sql.catalyst.TableIdentifier
val d01 = DeltaLog.forTable(spark, TableIdentifier(tableName))
val partitionFilters = Nil
val filesWithStatsForScan = d01.snapshot.filesWithStatsForScan(partitionFilters)
filesWithStatsForScan.printSchema
root
 |-- path: string (nullable = true)
 |-- partitionValues: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- size: long (nullable = false)
 |-- modificationTime: long (nullable = false)
 |-- dataChange: boolean (nullable = false)
 |-- stats: struct (nullable = true)
 |    |-- numRecords: long (nullable = true)
 |    |-- minValues: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |-- maxValues: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |-- nullCount: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
val tableStats = filesWithStatsForScan.select('path, 'size, $"stats.*")
tableStats.orderBy('path).show
+--------------------+----+----------+---------+---------+---------+
|                path|size|numRecords|minValues|maxValues|nullCount|
+--------------------+----+----------+---------+---------+---------+
|part-00000-43b9e4...| 296|         0|   {null}|   {null}|   {null}|
|part-00003-2685fb...| 478|         1|      {0}|      {0}|      {0}|
|part-00006-815e72...| 478|         1|      {1}|      {1}|      {0}|
|part-00009-654322...| 478|         1|      {2}|      {2}|      {0}|
|part-00012-f3a708...| 478|         1|      {3}|      {3}|      {0}|
|part-00015-5ca541...| 478|         1|      {4}|      {4}|      {0}|
+--------------------+----+----------+---------+---------+---------+

Execute Query with Data Skipping

val q = sql(s"SELECT * FROM $tableName WHERE id IN (2, 3)")
q.show

You should see the following logs and the output.

22/05/29 22:58:02 INFO PrepareDeltaScan: DELTA: Filtering files for query
22/05/29 22:58:02 INFO PrepareDeltaScan: DELTA: Done
+---+
| id|
+---+
|  3|
|  2|
+---+

web UI

Open the web UI to review the query and the associated job with the name Filtering files for query.