Demo: Kafka Data Source¶
This demo shows how to use Kafka Data Source in a streaming query.
Start Kafka Cluster¶
./start-confluent.sh
$ docker compose ps
NAME COMMAND SERVICE STATUS PORTS
broker "/etc/confluent/dock…" broker running 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect "/etc/confluent/dock…" connect running 0.0.0.0:8083->8083/tcp, 9092/tcp
control-center "/etc/confluent/dock…" control-center running 0.0.0.0:9021->9021/tcp
rest-proxy "/etc/confluent/dock…" rest-proxy running 0.0.0.0:8082->8082/tcp
schema-registry "/etc/confluent/dock…" schema-registry running 0.0.0.0:8081->8081/tcp
zookeeper "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Start Spark Shell¶
Run spark-shell
with spark-sql-kafka-0-10 external module.
Define Source¶
val events = spark
.readStream
.format("kafka")
.option("subscribe", "demo.kafka-data-source")
.option("kafka.bootstrap.servers", ":9092")
.load
scala> events.printSchema
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
val kvs = events.
select(
$"key" cast "string",
$"value" cast "string")
scala> kvs.printSchema
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
scala> kvs.explain
== Physical Plan ==
*(1) Project [cast(key#277 as string) AS key#295, cast(value#278 as string) AS value#296]
+- StreamingRelation kafka, [key#277, value#278, topic#279, partition#280, offset#281L, timestamp#282, timestampType#283]
Start Streaming Query¶
import java.time.Clock
val timeOffset = Clock.systemUTC.instant.getEpochSecond
val queryName = s"Demo: Kafka Data Source ($timeOffset)"
val checkpointLocation = s"/tmp/demo-checkpoint-$timeOffset"
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.Trigger
val sq = kvs
.writeStream
.format("console")
.option("checkpointLocation", checkpointLocation)
.option("truncate", false)
.queryName(queryName)
.trigger(Trigger.ProcessingTime(1.seconds))
.start
The batch 0 is immediately printed out to the console.
-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+
|key|value|
+---+-----+
+---+-----+
Send Event¶
echo "k1:v1" | kcat -P -b :9092 -K : -t demo.kafka-data-source
spark-shell
should print out the event as Batch 1.
-------------------------------------------
Batch: 1
-------------------------------------------
+---+-----+
|key|value|
+---+-----+
|k1 |v1 |
+---+-----+
Cleanup¶
sq.stop()
Exit spark-shell
and stop the Kafka cluster (Confluent Platform).