Skip to content

Demo: RocksDB State Store for Streaming Aggregation

This demo shows RocksDB State Store used to keep state of a streaming aggregation.


  1. Use Memory Data Source as the source
  2. Logging does not work

Configure RocksDB State Store

Configure RocksDBStateStoreProvider to be the StateStoreProvider in spark-shell.

./bin/spark-shell \
  --conf spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

Send Records

val events = spark
  .withColumnRenamed("timestamp", "event_time")
  .withColumn("gid", $"value" % 5) // create 5 groups

Define Windowed Streaming Aggregation

The streaming query uses Update output mode with a streaming watermark.

val windowed = events
  .withWatermark(eventTime = "event_time", delayThreshold = "10 seconds")
      timeColumn = $"event_time",
      windowDuration = "5 seconds"))
    collect_list("value") as "vs")
 |-- gid: long (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- vs: array (nullable = false)
 |    |-- element: long (containsNull = false)

Start Streaming Query

import java.time.Clock
val timeOffset = Clock.systemUTC.instant.getEpochSecond
val queryName = s"Demo: RocksDB for Streaming Aggregation ($timeOffset)"
val checkpointLocation = s"/tmp/demo-checkpoint-$timeOffset"

import scala.concurrent.duration._
import org.apache.spark.sql.streaming.OutputMode.Update
import org.apache.spark.sql.streaming.Trigger
val sq = windowed
  .option("checkpointLocation", checkpointLocation)
  .option("truncate", false)

The streaming query gets executed and prints out Batch 0 to the console.

Batch: 0
|id |window|vs |seconds|
