Demo: Streaming Watermark¶
This demo shows the internals of streaming watermark with Kafka Data Source.
Note
Please start a Kafka cluster and spark-shell
as described in Demo: Kafka Data Source.
Streaming Query with Watermark¶
import java.time.Clock
val timeOffset = Clock.systemUTC.instant.getEpochSecond
val queryName = s"Demo: Streaming Watermark ($timeOffset)"
val checkpointLocation = s"/tmp/demo-checkpoint-$timeOffset"
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val eventTimeCol = (($"tokens"(2) cast "long") + lit(timeOffset)) cast "timestamp" as "event_time"
val sq = spark
.readStream
.format("kafka")
.option("subscribe", "demo.streaming-watermark")
.option("kafka.bootstrap.servers", ":9092")
.load
.select($"value" cast "string")
.select(split($"value", ",") as "tokens")
.select(
$"tokens"(0) as "id" cast "long",
$"tokens"(1) as "name",
eventTimeCol)
.withWatermark(eventTime = "event_time", delayThreshold = "5 seconds")
.writeStream
.format("console")
.queryName(queryName)
.trigger(Trigger.ProcessingTime(1.seconds))
.option("checkpointLocation", checkpointLocation)
.option("truncate", false)
.start
Send Events¶
echo "0,zero,0" | kcat -P -b :9092 -t demo.streaming-watermark
echo "10,ten,10" | kcat -P -b :9092 -t demo.streaming-watermark