Demo: DeltaTable, DeltaLog And Snapshots¶
Create Delta Table¶
import org.apache.spark.sql.SparkSession
assert(spark.isInstanceOf[SparkSession])
val tableName = "users"
sql(s"DROP TABLE IF EXISTS $tableName")
sql(s"""
| CREATE TABLE $tableName (id bigint, name string, city string, country string)
| USING delta
""".stripMargin)
scala> spark.catalog.listTables.show
+-----+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+-----+--------+-----------+---------+-----------+
|users| default| null| MANAGED| false|
+-----+--------+-----------+---------+-----------+
Access Transaction Log (DeltaLog)¶
import org.apache.spark.sql.catalyst.TableIdentifier
val tid = TableIdentifier(tableName)
import org.apache.spark.sql.delta.DeltaLog
val deltaLog = DeltaLog.forTable(spark, tid)
Update the state of the delta table to the most recent version.
val snapshot = deltaLog.update()
assert(snapshot.version == 0)
val state = snapshot.state
scala> :type state
org.apache.spark.sql.Dataset[org.apache.spark.sql.delta.actions.SingleAction]
Review the cached RDD for the state snapshot in the Storage tab of the web UI (e.g. http://localhost:4040/storage/).
The "version" part of Delta Table State name of the cached RDD should match the version of the snapshot.
Show the changes (actions).
scala> state.show
+----+----+------+--------------------+--------+----+----------+
| txn| add|remove| metaData|protocol| cdc|commitInfo|
+----+----+------+--------------------+--------+----+----------+
|null|null| null| null| {1, 2}|null| null|
|null|null| null|{90316970-5bf1-45...| null|null| null|
+----+----+------+--------------------+--------+----+----------+
DeltaTable as DataFrame¶
DeltaTable¶
import io.delta.tables.DeltaTable
val dt = DeltaTable.forName(tableName)
val h = dt.history.select('version, 'operation, 'operationParameters, 'operationMetrics)
scala> h.show(truncate = false)
+-------+------------+-----------------------------------------------------------------------------+----------------+
|version|operation |operationParameters |operationMetrics|
+-------+------------+-----------------------------------------------------------------------------+----------------+
|0 |CREATE TABLE|{isManaged -> true, description -> null, partitionBy -> [], properties -> {}}|{} |
+-------+------------+-----------------------------------------------------------------------------+----------------+
Converting DeltaTable into DataFrame¶
val users = dt.toDF
scala> users.show
+---+----+----+-------+
| id|name|city|country|
+---+----+----+-------+
+---+----+----+-------+
Add new users¶
val newUsers = Seq(
(0L, "Agata", "Warsaw", "Poland"),
(1L, "Bartosz", "Paris", "France")
).toDF("id", "name", "city", "country")
scala> newUsers.show
+---+-------+------+-------+
| id| name| city|country|
+---+-------+------+-------+
| 0| Agata|Warsaw| Poland|
| 1|Bartosz| Paris| France|
+---+-------+------+-------+
// newUsers.write.format("delta").mode("append").saveAsTable(name)
newUsers.writeTo(tableName).append
assert(deltaLog.snapshot.version == 1)
Review the cached RDD for the state snapshot in the Storage tab of the web UI (e.g. http://localhost:4040/storage/).
Note that the DataFrame
variant of the delta table has automatically been refreshed (making REFRESH TABLE
unnecessary).
scala> users.show
+---+-------+------+-------+
| id| name| city|country|
+---+-------+------+-------+
| 1|Bartosz| Paris| France|
| 0| Agata|Warsaw| Poland|
+---+-------+------+-------+
val h = dt.history.select('version, 'operation, 'operationParameters, 'operationMetrics)
scala> h.show(truncate = false)
+-------+------------+-----------------------------------------------------------------------------+-----------------------------------------------------------+
|version|operation |operationParameters |operationMetrics |
+-------+------------+-----------------------------------------------------------------------------+-----------------------------------------------------------+
|1 |WRITE |{mode -> Append, partitionBy -> []} |{numFiles -> 2, numOutputBytes -> 2299, numOutputRows -> 2}|
|0 |CREATE TABLE|{isManaged -> true, description -> null, partitionBy -> [], properties -> {}}|{} |
+-------+------------+-----------------------------------------------------------------------------+-----------------------------------------------------------+