Demo: DeltaTable, DeltaLog And Snapshots¶
Create Delta Table¶
import org.apache.spark.sql.SparkSession
assert(spark.isInstanceOf[SparkSession])
val tableName = "users"
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"""
    | CREATE TABLE $tableName (id bigint, name string, city string, country string)
    | USING delta
    """.stripMargin)
scala> spark.catalog.listTables.show
+-----+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+-----+--------+-----------+---------+-----------+
|users| default|       null|  MANAGED|      false|
+-----+--------+-----------+---------+-----------+
Access Transaction Log (DeltaLog)¶
import org.apache.spark.sql.catalyst.TableIdentifier
val tid = TableIdentifier(tableName)
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, tid)
Update the state of the delta table to the most recent version.
val snapshot = deltaLog.update()
assert(snapshot.version == 0)
val state = snapshot.state
scala> :type state
org.apache.spark.sql.Dataset[org.apache.spark.sql.delta.actions.SingleAction]
Review the cached RDD for the state snapshot in the Storage tab of the web UI (e.g. http://localhost:4040/storage/).

The "version" part of Delta Table State name of the cached RDD should match the version of the snapshot.
Show the changes (actions).
scala> state.show
+----+----+------+--------------------+--------+----+----------+
| txn| add|remove|            metaData|protocol| cdc|commitInfo|
+----+----+------+--------------------+--------+----+----------+
|null|null|  null|                null|  {1, 2}|null|      null|
|null|null|  null|{90316970-5bf1-45...|    null|null|      null|
+----+----+------+--------------------+--------+----+----------+
DeltaTable as DataFrame¶
DeltaTable¶
import io.delta.tables.DeltaTable
val dt = DeltaTable.forName(tableName)
val h = dt.history.select('version, 'operation, 'operationParameters, 'operationMetrics)
scala> h.show(truncate = false)
+-------+------------+-----------------------------------------------------------------------------+----------------+
|version|operation   |operationParameters                                                          |operationMetrics|
+-------+------------+-----------------------------------------------------------------------------+----------------+
|0      |CREATE TABLE|{isManaged -> true, description -> null, partitionBy -> [], properties -> {}}|{}              |
+-------+------------+-----------------------------------------------------------------------------+----------------+
Converting DeltaTable into DataFrame¶
val users = dt.toDF
scala> users.show
+---+----+----+-------+
| id|name|city|country|
+---+----+----+-------+
+---+----+----+-------+
Add new users¶
val newUsers = Seq(
  (0L, "Agata", "Warsaw", "Poland"),
  (1L, "Bartosz", "Paris", "France")
).toDF("id", "name", "city", "country")
scala> newUsers.show
+---+-------+------+-------+
| id|   name|  city|country|
+---+-------+------+-------+
|  0|  Agata|Warsaw| Poland|
|  1|Bartosz| Paris| France|
+---+-------+------+-------+
// newUsers.write.format("delta").mode("append").saveAsTable(name)
newUsers.writeTo(tableName).append
assert(deltaLog.snapshot.version == 1)
Review the cached RDD for the state snapshot in the Storage tab of the web UI (e.g. http://localhost:4040/storage/).
Note that the DataFrame variant of the delta table has automatically been refreshed (making REFRESH TABLE unnecessary).
scala> users.show
+---+-------+------+-------+
| id|   name|  city|country|
+---+-------+------+-------+
|  1|Bartosz| Paris| France|
|  0|  Agata|Warsaw| Poland|
+---+-------+------+-------+
val h = dt.history.select('version, 'operation, 'operationParameters, 'operationMetrics)
scala> h.show(truncate = false)
+-------+------------+-----------------------------------------------------------------------------+-----------------------------------------------------------+
|version|operation   |operationParameters                                                          |operationMetrics                                           |
+-------+------------+-----------------------------------------------------------------------------+-----------------------------------------------------------+
|1      |WRITE       |{mode -> Append, partitionBy -> []}                                          |{numFiles -> 2, numOutputBytes -> 2299, numOutputRows -> 2}|
|0      |CREATE TABLE|{isManaged -> true, description -> null, partitionBy -> [], properties -> {}}|{}                                                         |
+-------+------------+-----------------------------------------------------------------------------+-----------------------------------------------------------+