Skip to content

Demo: Using Delta Lake as Streaming Sink in Structured Streaming

assert(spark.isInstanceOf[org.apache.spark.sql.SparkSession])
assert(spark.version.matches("2.4.[2-4]"), "Delta Lake supports Spark 2.4.2+")

// Input data "format"
case class User(id: Long, name: String, city: String)

// Any streaming data source would work
// Using memory data source
// Gives control over the input stream
implicit val ctx = spark.sqlContext
import org.apache.spark.sql.execution.streaming.MemoryStream
val usersIn = MemoryStream[User]
val users = usersIn.toDF

val deltaLake = "/tmp/delta-lake"
val checkpointLocation = "/tmp/delta-checkpointLocation"
val path = s"$deltaLake/users"
val partitionBy = "city"

// The streaming query that writes out to Delta Lake
val sq = users
  .writeStream
  .format("delta")
  .option("checkpointLocation", checkpointLocation)
  .option("path", path)
  .partitionBy(partitionBy)
  .start

// TIP: You could use git to version the path directory
//      and track the changes of every micro-batch

// TIP: Use web UI to monitor execution, e.g. http://localhost:4040

// FIXME: Use DESCRIBE HISTORY every micro-batch

val batch1 = Seq(
  User(0, "Agata", "Warsaw"),
  User(1, "Jacek", "Warsaw"))
val offset = usersIn.addData(batch1)
sq.processAllAvailable()

val history = s"DESCRIBE HISTORY delta.`$path`"
val clmns = Seq($"version", $"timestamp", $"operation", $"operationParameters", $"isBlindAppend")

val h = sql(history).select(clmns: _*).orderBy($"version".asc)
scala> h.show(truncate = false)
+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-------------+
|version|timestamp          |operation       |operationParameters                                                                  |isBlindAppend|
+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-------------+
|0      |2019-12-06 10:06:20|STREAMING UPDATE|[outputMode -> Append, queryId -> f3990048-f0b7-48b6-9bf6-397004c36e53, epochId -> 0]|true         |
+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-------------+

val batch2 = Seq(
  User(2, "Bartek", "Paris"),
  User(3, "Jacek", "Paris"))
val offset = usersIn.addData(batch2)
sq.processAllAvailable()

// You have to execute the history SQL command again
// It materializes immediately with whatever data is available at the time
val h = sql(history).select(clmns: _*).orderBy($"version".asc)
scala> h.show(truncate = false)
+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-------------+
|version|timestamp          |operation       |operationParameters                                                                  |isBlindAppend|
+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-------------+
|0      |2019-12-06 10:06:20|STREAMING UPDATE|[outputMode -> Append, queryId -> f3990048-f0b7-48b6-9bf6-397004c36e53, epochId -> 0]|true         |
|1      |2019-12-06 10:13:27|STREAMING UPDATE|[outputMode -> Append, queryId -> f3990048-f0b7-48b6-9bf6-397004c36e53, epochId -> 1]|true         |
+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-------------+

val batch3 = Seq(
  User(4, "Gorazd", "Ljubljana"))
val offset = usersIn.addData(batch3)
sq.processAllAvailable()

// Let's use DeltaTable API instead

import io.delta.tables.DeltaTable
val usersDT = DeltaTable.forPath(path)

val h = usersDT.history.select(clmns: _*).orderBy($"version".asc)
scala> h.show(truncate = false)
+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-------------+
|version|timestamp          |operation       |operationParameters                                                                  |isBlindAppend|
+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-------------+
|0      |2019-12-06 10:06:20|STREAMING UPDATE|[outputMode -> Append, queryId -> f3990048-f0b7-48b6-9bf6-397004c36e53, epochId -> 0]|true         |
|1      |2019-12-06 10:13:27|STREAMING UPDATE|[outputMode -> Append, queryId -> f3990048-f0b7-48b6-9bf6-397004c36e53, epochId -> 1]|true         |
|2      |2019-12-06 10:20:56|STREAMING UPDATE|[outputMode -> Append, queryId -> f3990048-f0b7-48b6-9bf6-397004c36e53, epochId -> 2]|true         |
+-------+-------------------+----------------+-------------------------------------------------------------------------------------+-------------+