Demo: Developing Kafka Streams Application¶
Build Topology using StreamsBuilder¶
A Kafka Streams application requires a Topology that can be created directly or described (and built) indirectly using StreamsBuilder.
import org.apache.kafka.streams.scala.StreamsBuilder
val streamBuilder = new StreamsBuilder
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
val records = streamBuilder.stream[String, String](topic = "streams-demo-input")
records.to(topic = "streams-demo-output")
import org.apache.kafka.streams.Topology
val topology = streamBuilder.build()
A topology can be described.
println(topology.describe)
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [streams-demo-input])
--> KSTREAM-SINK-0000000001
Sink: KSTREAM-SINK-0000000001 (topic: streams-demo-output)
<-- KSTREAM-SOURCE-0000000000
Create Kafka Topics¶
Kafka Streams requires that all input topics are available before it can be started (or MissingSourceTopicException
is thrown).
./bin/kafka-topics.sh \
--bootstrap-server :9092 \
--create \
--topic streams-demo-input \
--partitions 1 \
--replication-factor 1
./bin/kafka-topics.sh \
--bootstrap-server :9092 \
--create \
--topic streams-demo-output \
--partitions 1 \
--replication-factor 1
StreamsConfig¶
An execution environment of a Kafka Streams application is configured using StreamsConfig.
import org.apache.kafka.streams.StreamsConfig
import scala.jdk.CollectionConverters._
// Only required configuration properties
// And one more for demo purposes to slow processing to 15 secs
// import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
val props = Map(
StreamsConfig.APPLICATION_ID_CONFIG -> "kafka-streams-demo",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG -> ":9092",
StreamsConfig.POLL_MS_CONFIG -> 15.seconds.toMillis).asJava
val config = new StreamsConfig(props)
KafkaStreams¶
The execution environment of a Kafka Stream application is KafkaStreams.
import org.apache.kafka.streams.KafkaStreams
val streams = new KafkaStreams(topology, config)
Eventually, KafkaStreams
should be started for the stream processing to be executed.
streams.start
kcat¶
kcat -P -b localhost -t streams-demo-input
kcat -C -b localhost -t streams-demo-output