Skip to content

Demo: dataChange

This demo shows dataChange option in action.

In combination with Overwrite mode, dataChange option can be used to transactionally rearrange data in a delta table.

Start Spark Shell

./bin/spark-shell \
  --packages io.delta:delta-core_2.12:1.0.0 \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

Create Delta Table

val path = "/tmp/delta/d01"

Make sure that there is no delta table at the location. Remove it if exists and start over.

import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, path)
assert(deltaLog.tableExists == false)

Create the demo delta table (using SQL).

sql(s"""
  CREATE TABLE delta.`$path`
  USING delta
  VALUES ((0, 'Jacek'), (1, 'Agata')) AS (id, name)
  """)

Show History (Before)

import io.delta.tables.DeltaTable
val dt = DeltaTable.forPath(path)
assert(dt.history.count == 1)

Repartition Table

The following dataChange example shows a batch query that repartitions a delta table (perhaps while other queries could be using the delta table).

Let's check out the number of partitions.

spark.read.format("delta").load(path).rdd.getNumPartitions

The key items to pay attention to are:

  1. The batch query is independent from any other running streaming or batch queries over the delta table
  2. The batch query reads from the same delta table it saves data to
  3. The save mode is overwrite
  4. dataChange option is disabled
spark
  .read
  .format("delta")
  .load(path)
  .repartition(10)
  .write
  .format("delta")
  .mode("overwrite")
  .option("dataChange", false)
  .save(path)

Let's check out the number of partitions after the repartition.

spark.read.format("delta").load(path).rdd.getNumPartitions

Show History (After)

assert(dt.history.count == 2)
dt.history
  .select('version, 'operation, 'operationParameters, 'operationMetrics)
  .orderBy('version.asc)
  .show(truncate = false)
+-------+----------------------+------------------------------------------------------------------------------+-----------------------------------------------------------+
|version|operation             |operationParameters                                                           |operationMetrics                                           |
+-------+----------------------+------------------------------------------------------------------------------+-----------------------------------------------------------+
|0      |CREATE TABLE AS SELECT|{isManaged -> false, description -> null, partitionBy -> [], properties -> {}}|{numFiles -> 1, numOutputBytes -> 1273, numOutputRows -> 1}|
|1      |WRITE                 |{mode -> Overwrite, partitionBy -> []}                                        |{numFiles -> 2, numOutputBytes -> 1992, numOutputRows -> 1}|
+-------+----------------------+------------------------------------------------------------------------------+-----------------------------------------------------------+

Last update: 2021-03-17