StreamsBuilder¶
StreamsBuilder is the entry point to the High-Level Streams DSL to define and build a stream processing topology.
All of the high-level operators use the InternalStreamsBuilder behind the scenes. In other words, StreamsBuilder offers a more developer-friendly high-level API for developing Kafka Streams applications than using the InternalStreamsBuilder API directly (and is a façade of InternalStreamsBuilder).
Scala API for Kafka Streams
Use Scala API for Kafka Streams to make your Kafka Streams development more pleasant with Scala.
Creating Instance¶
StreamsBuilder takes no arguments to be created.
import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
While being created, StreamsBuilder creates an empty Topology that in turn is requested for an InternalTopologyBuilder. In the end, StreamsBuilder creates an InternalStreamsBuilder.

Topology¶
StreamsBuilder creates a Topology when created.
StreamsBuilder uses the Topology to create an InternalTopologyBuilder.
The Topology is then optimized and returned when StreamsBuilder is requested to build a topology.
Building and Optimizing Topology¶
Topology build() // (1)
Topology build(
Properties props)
- Uses undefined properties (
null)
build requests the InternalStreamsBuilder to build and optimize a topology. In the end, build returns the Topology.
globalTable¶
GlobalKTable<K, V> globalTable(
String topic)
GlobalKTable<K, V> globalTable(
String topic,
Consumed<K, V> consumed)
GlobalKTable<K, V> globalTable(
String topic,
Consumed<K, V> consumed,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
GlobalKTable<K, V> globalTable(
String topic,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
globalTable adds a GlobalKTable to a topology.
Demo: Non-queryable GlobalKTable¶
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import serialization.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
val globalTable = builder.globalTable[String, String](topic = "demo-global-table")
scala> :type globalTable
org.apache.kafka.streams.kstream.GlobalKTable[String,String]
assert(globalTable.queryableStoreName == null)
val topology = builder.build()
scala> println(topology.describe)
Topologies:
Sub-topology: 0 for global store (will not generate tasks)
Source: KSTREAM-SOURCE-0000000001 (topics: [demo-global-table])
--> KTABLE-SOURCE-0000000002
Processor: KTABLE-SOURCE-0000000002 (stores: [demo-global-table-STATE-STORE-0000000000])
--> none
<-- KSTREAM-SOURCE-0000000001
Demo: Queryable GlobalKTable¶
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import serialization.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
import org.apache.kafka.streams.state.Stores
val supplier = Stores.inMemoryKeyValueStore("queryable-store-name")
import org.apache.kafka.streams.scala.kstream.Materialized
val materialized = Materialized.as[String, String](supplier)
val zipCodes = builder.globalTable[String, String](topic = "zip-codes", materialized)
scala> :type zipCodes
org.apache.kafka.streams.kstream.GlobalKTable[String,String]
assert(zipCodes.queryableStoreName == "queryable-store-name")
val topology = builder.build()
scala> println(topology.describe)
Topologies:
Sub-topology: 0 for global store (will not generate tasks)
Source: KSTREAM-SOURCE-0000000000 (topics: [zip-codes])
--> KTABLE-SOURCE-0000000001
Processor: KTABLE-SOURCE-0000000001 (stores: [queryable-store-name])
--> none
<-- KSTREAM-SOURCE-0000000000
stream¶
KStream<K, V> stream(
Collection<String> topics)
KStream<K, V> stream(
Collection<String> topics,
Consumed<K, V> consumed)
KStream<K, V> stream(
Pattern topicPattern)
KStream<K, V> stream(
Pattern topicPattern,
Consumed<K, V> consumed)
KStream<K, V> stream(
String topic)
KStream<K, V> stream(
String topic,
Consumed<K, V> consumed)
stream requests the InternalStreamsBuilder to stream.
Demo: Custom Processor Name¶
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import ImplicitConversions._
import serialization.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
implicit val consumed = Consumed.`with`[String, String].withName("processorName")
val demo = builder.stream[String, String]("demo")
scala> println(builder.build().describe)
Topologies:
Sub-topology: 0
Source: processorName (topics: [demo])
--> none