Demo: RocksDB State Store for Streaming Aggregation¶
This demo shows RocksDB State Store used to keep state of a streaming aggregation.
FIXME
- Use Memory Data Source as the source
- Logging does not work
Configure RocksDB State Store¶
Configure RocksDBStateStoreProvider to be the StateStoreProvider in spark-shell
.
./bin/spark-shell \
--conf spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
Send Records¶
val events = spark
.readStream
.format("rate")
.load
.withColumnRenamed("timestamp", "event_time")
.withColumn("gid", $"value" % 5) // create 5 groups
Define Windowed Streaming Aggregation¶
The streaming query uses Update output mode with a streaming watermark.
val windowed = events
.withWatermark(eventTime = "event_time", delayThreshold = "10 seconds")
.groupBy(
$"gid",
window(
timeColumn = $"event_time",
windowDuration = "5 seconds"))
.agg(
collect_list("value") as "vs")
windowed.printSchema
root
|-- gid: long (nullable = true)
|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- vs: array (nullable = false)
| |-- element: long (containsNull = false)
Start Streaming Query¶
import java.time.Clock
val timeOffset = Clock.systemUTC.instant.getEpochSecond
val queryName = s"Demo: RocksDB for Streaming Aggregation ($timeOffset)"
val checkpointLocation = s"/tmp/demo-checkpoint-$timeOffset"
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.OutputMode.Update
import org.apache.spark.sql.streaming.Trigger
val sq = windowed
.writeStream
.format("console")
.option("checkpointLocation", checkpointLocation)
.option("truncate", false)
.outputMode(Update)
.queryName(queryName)
.trigger(Trigger.ProcessingTime(5.seconds))
.start
The streaming query gets executed and prints out Batch 0 to the console.
-------------------------------------------
Batch: 0
-------------------------------------------
+---+------+---+-------+
|id |window|vs |seconds|
+---+------+---+-------+
+---+------+---+-------+
Cleanup¶
spark.streams.active.foreach(_.stop)