Skip to content

Demo: Kafka Data Source

This demo shows how to use Kafka Data Source in a streaming query.

Start Kafka Cluster

$ docker compose ps
NAME                COMMAND                  SERVICE             STATUS              PORTS
broker              "/etc/confluent/dock…"   broker              running   >9092/tcp,>9101/tcp
connect             "/etc/confluent/dock…"   connect             running   >8083/tcp, 9092/tcp
control-center      "/etc/confluent/dock…"   control-center      running   >9021/tcp
rest-proxy          "/etc/confluent/dock…"   rest-proxy          running   >8082/tcp
schema-registry     "/etc/confluent/dock…"   schema-registry     running   >8081/tcp
zookeeper           "/etc/confluent/dock…"   zookeeper           running             2888/tcp,>2181/tcp, 3888/tcp

Start Spark Shell

Run spark-shell with spark-sql-kafka-0-10 external module.

Define Source

val events = spark
  .option("subscribe", "demo.kafka-data-source")
  .option("kafka.bootstrap.servers", ":9092")
scala> events.printSchema
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
val kvs = events.
    $"key" cast "string",
    $"value" cast "string")
scala> kvs.printSchema
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
scala> kvs.explain
== Physical Plan ==
*(1) Project [cast(key#277 as string) AS key#295, cast(value#278 as string) AS value#296]
+- StreamingRelation kafka, [key#277, value#278, topic#279, partition#280, offset#281L, timestamp#282, timestampType#283]

Start Streaming Query

import java.time.Clock
val timeOffset = Clock.systemUTC.instant.getEpochSecond
val queryName = s"Demo: Kafka Data Source ($timeOffset)"
val checkpointLocation = s"/tmp/demo-checkpoint-$timeOffset"
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.Trigger
val sq = kvs
  .option("checkpointLocation", checkpointLocation)
  .option("truncate", false)

The batch 0 is immediately printed out to the console.

Batch: 0

Send Event

echo "k1:v1" | kcat -P -b :9092 -K : -t demo.kafka-data-source

spark-shell should print out the event as Batch 1.

Batch: 1
|k1 |v1   |



Exit spark-shell and stop the Kafka cluster (Confluent Platform).