Skip to content

Demo: DeltaTable, DeltaLog And Snapshots

Create Delta Table

import org.apache.spark.sql.SparkSession
assert(spark.isInstanceOf[SparkSession])
val tableName = "users"
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"""
    | CREATE TABLE $tableName (id bigint, name string, city string, country string)
    | USING delta
    """.stripMargin)
scala> spark.catalog.listTables.show
+-----+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+-----+--------+-----------+---------+-----------+
|users| default|       null|  MANAGED|      false|
+-----+--------+-----------+---------+-----------+

Access Transaction Log (DeltaLog)

import org.apache.spark.sql.catalyst.TableIdentifier
val tid = TableIdentifier(tableName)

import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, tid)

Update the state of the delta table to the most recent version.

val snapshot = deltaLog.update()
assert(snapshot.version == 0)
val state = snapshot.state
scala> :type state
org.apache.spark.sql.Dataset[org.apache.spark.sql.delta.actions.SingleAction]

Review the cached RDD for the state snapshot in the Storage tab of the web UI (e.g. http://localhost:4040/storage/).

Snapshot (Cached RDD) in web UI

The "version" part of Delta Table State name of the cached RDD should match the version of the snapshot.

Show the changes (actions).

scala> state.show
+----+----+------+--------------------+--------+----+----------+
| txn| add|remove|            metaData|protocol| cdc|commitInfo|
+----+----+------+--------------------+--------+----+----------+
|null|null|  null|                null|  {1, 2}|null|      null|
|null|null|  null|{90316970-5bf1-45...|    null|null|      null|
+----+----+------+--------------------+--------+----+----------+

DeltaTable as DataFrame

DeltaTable

import io.delta.tables.DeltaTable
val dt = DeltaTable.forName(tableName)
val h = dt.history.select('version, 'operation, 'operationParameters, 'operationMetrics)
scala> h.show(truncate = false)
+-------+------------+-----------------------------------------------------------------------------+----------------+
|version|operation   |operationParameters                                                          |operationMetrics|
+-------+------------+-----------------------------------------------------------------------------+----------------+
|0      |CREATE TABLE|{isManaged -> true, description -> null, partitionBy -> [], properties -> {}}|{}              |
+-------+------------+-----------------------------------------------------------------------------+----------------+

Converting DeltaTable into DataFrame

val users = dt.toDF
scala> users.show
+---+----+----+-------+
| id|name|city|country|
+---+----+----+-------+
+---+----+----+-------+

Add new users

val newUsers = Seq(
  (0L, "Agata", "Warsaw", "Poland"),
  (1L, "Bartosz", "Paris", "France")
).toDF("id", "name", "city", "country")
scala> newUsers.show
+---+-------+------+-------+
| id|   name|  city|country|
+---+-------+------+-------+
|  0|  Agata|Warsaw| Poland|
|  1|Bartosz| Paris| France|
+---+-------+------+-------+
// newUsers.write.format("delta").mode("append").saveAsTable(name)
newUsers.writeTo(tableName).append
assert(deltaLog.snapshot.version == 1)

Review the cached RDD for the state snapshot in the Storage tab of the web UI (e.g. http://localhost:4040/storage/).

Note that the DataFrame variant of the delta table has automatically been refreshed (making REFRESH TABLE unnecessary).

scala> users.show
+---+-------+------+-------+
| id|   name|  city|country|
+---+-------+------+-------+
|  1|Bartosz| Paris| France|
|  0|  Agata|Warsaw| Poland|
+---+-------+------+-------+
val h = dt.history.select('version, 'operation, 'operationParameters, 'operationMetrics)
scala> h.show(truncate = false)
+-------+------------+-----------------------------------------------------------------------------+-----------------------------------------------------------+
|version|operation   |operationParameters                                                          |operationMetrics                                           |
+-------+------------+-----------------------------------------------------------------------------+-----------------------------------------------------------+
|1      |WRITE       |{mode -> Append, partitionBy -> []}                                          |{numFiles -> 2, numOutputBytes -> 2299, numOutputRows -> 2}|
|0      |CREATE TABLE|{isManaged -> true, description -> null, partitionBy -> [], properties -> {}}|{}                                                         |
+-------+------------+-----------------------------------------------------------------------------+-----------------------------------------------------------+

Last update: 2021-07-09
Back to top