Skip to content

StreamsBuilder

StreamsBuilder is the entry point to the High-Level Streams DSL to define and build a stream processing topology.

All of the high-level operators use the InternalStreamsBuilder behind the scenes. In other words, StreamsBuilder offers a more developer-friendly high-level API for developing Kafka Streams applications than using the InternalStreamsBuilder API directly (and is a façade of InternalStreamsBuilder).

Scala API for Kafka Streams

Use Scala API for Kafka Streams to make your Kafka Streams development more pleasant with Scala.

Creating Instance

StreamsBuilder takes no arguments to be created.

import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder

While being created, StreamsBuilder creates an empty Topology that in turn is requested for an InternalTopologyBuilder. In the end, StreamsBuilder creates an InternalStreamsBuilder.

StreamsBuilder, Topology and InternalStreamsBuilder

Topology

StreamsBuilder creates a Topology when created.

StreamsBuilder uses the Topology to create an InternalTopologyBuilder.

The Topology is then optimized and returned when StreamsBuilder is requested to build a topology.

Building and Optimizing Topology

Topology build() // (1)
Topology build(
  Properties props)
  1. Uses undefined properties (null)

build requests the InternalStreamsBuilder to build and optimize a topology. In the end, build returns the Topology.

globalTable

GlobalKTable<K, V> globalTable(
  String topic)
GlobalKTable<K, V> globalTable(
  String topic,
  Consumed<K, V> consumed)
GlobalKTable<K, V> globalTable(
  String topic,
  Consumed<K, V> consumed,
  Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
GlobalKTable<K, V> globalTable(
  String topic,
  Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)

globalTable adds a GlobalKTable to a topology.

Demo: Non-queryable GlobalKTable

import org.apache.kafka.streams.scala._
import ImplicitConversions._
import serialization.Serdes._

import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
val globalTable = builder.globalTable[String, String](topic = "demo-global-table")
scala> :type globalTable
org.apache.kafka.streams.kstream.GlobalKTable[String,String]
assert(globalTable.queryableStoreName == null)
val topology = builder.build()
scala> println(topology.describe)
Topologies:
   Sub-topology: 0 for global store (will not generate tasks)
    Source: KSTREAM-SOURCE-0000000001 (topics: [demo-global-table])
      --> KTABLE-SOURCE-0000000002
    Processor: KTABLE-SOURCE-0000000002 (stores: [demo-global-table-STATE-STORE-0000000000])
      --> none
      <-- KSTREAM-SOURCE-0000000001

Demo: Queryable GlobalKTable

import org.apache.kafka.streams.scala._
import ImplicitConversions._
import serialization.Serdes._

import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
import org.apache.kafka.streams.state.Stores
val supplier = Stores.inMemoryKeyValueStore("queryable-store-name")

import org.apache.kafka.streams.scala.kstream.Materialized
val materialized = Materialized.as[String, String](supplier)
val zipCodes = builder.globalTable[String, String](topic = "zip-codes", materialized)
scala> :type zipCodes
org.apache.kafka.streams.kstream.GlobalKTable[String,String]
assert(zipCodes.queryableStoreName == "queryable-store-name")
val topology = builder.build()
scala> println(topology.describe)
Topologies:
   Sub-topology: 0 for global store (will not generate tasks)
    Source: KSTREAM-SOURCE-0000000000 (topics: [zip-codes])
      --> KTABLE-SOURCE-0000000001
    Processor: KTABLE-SOURCE-0000000001 (stores: [queryable-store-name])
      --> none
      <-- KSTREAM-SOURCE-0000000000

stream

KStream<K, V> stream(
  Collection<String> topics)
KStream<K, V> stream(
  Collection<String> topics,
  Consumed<K, V> consumed)
KStream<K, V> stream(
  Pattern topicPattern)
KStream<K, V> stream(
  Pattern topicPattern,
  Consumed<K, V> consumed)
KStream<K, V> stream(
  String topic)
KStream<K, V> stream(
  String topic,
  Consumed<K, V> consumed)

stream requests the InternalStreamsBuilder to stream.

Demo: Custom Processor Name

import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import ImplicitConversions._
import serialization.Serdes._

import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
implicit val consumed = Consumed.`with`[String, String].withName("processorName")
val demo = builder.stream[String, String]("demo")
scala> println(builder.build().describe)
Topologies:
   Sub-topology: 0
    Source: processorName (topics: [demo])
      --> none
Back to top