Skip to content

Demo: Developing Kafka Streams Application

Build Topology using StreamsBuilder

A Kafka Streams application requires a Topology that can be created directly or described (and built) indirectly using StreamsBuilder.

import org.apache.kafka.streams.scala.StreamsBuilder
val streamBuilder = new StreamsBuilder
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
val records = streamBuilder.stream[String, String](topic = "streams-demo-input")
records.to(topic = "streams-demo-output")
import org.apache.kafka.streams.Topology
val topology = streamBuilder.build()

A topology can be described.

println(topology.describe)
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-demo-input])
      --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001 (topic: streams-demo-output)
      <-- KSTREAM-SOURCE-0000000000

Create Kafka Topics

Kafka Streams requires that all input topics are available before it can be started (or MissingSourceTopicException is thrown).

./bin/kafka-topics.sh \
  --bootstrap-server :9092 \
  --create \
  --topic streams-demo-input \
  --partitions 1 \
  --replication-factor 1
./bin/kafka-topics.sh \
  --bootstrap-server :9092 \
  --create \
  --topic streams-demo-output \
  --partitions 1 \
  --replication-factor 1

StreamsConfig

An execution environment of a Kafka Streams application is configured using StreamsConfig.

import org.apache.kafka.streams.StreamsConfig
import scala.jdk.CollectionConverters._
// Only required configuration properties
// And one more for demo purposes to slow processing to 15 secs
// import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
val props = Map(
  StreamsConfig.APPLICATION_ID_CONFIG -> "kafka-streams-demo",
  StreamsConfig.BOOTSTRAP_SERVERS_CONFIG -> ":9092",
  StreamsConfig.POLL_MS_CONFIG -> 15.seconds.toMillis).asJava
val config = new StreamsConfig(props)

KafkaStreams

The execution environment of a Kafka Stream application is KafkaStreams.

import org.apache.kafka.streams.KafkaStreams
val streams = new KafkaStreams(topology, config)

Eventually, KafkaStreams should be started for the stream processing to be executed.

streams.start

kcat

kcat -P -b localhost -t streams-demo-input
kcat -C -b localhost -t streams-demo-output
Back to top