StreamsBuilder¶
StreamsBuilder
is the entry point to the High-Level Streams DSL to define and build a stream processing topology.
All of the high-level operators use the InternalStreamsBuilder behind the scenes. In other words, StreamsBuilder
offers a more developer-friendly high-level API for developing Kafka Streams applications than using the InternalStreamsBuilder
API directly (and is a façade of InternalStreamsBuilder
).
Scala API for Kafka Streams
Use Scala API for Kafka Streams to make your Kafka Streams development more pleasant with Scala.
Creating Instance¶
StreamsBuilder
takes no arguments to be created.
import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
While being created, StreamsBuilder
creates an empty Topology that in turn is requested for an InternalTopologyBuilder. In the end, StreamsBuilder
creates an InternalStreamsBuilder.
Topology¶
StreamsBuilder
creates a Topology when created.
StreamsBuilder
uses the Topology
to create an InternalTopologyBuilder.
The Topology
is then optimized and returned when StreamsBuilder
is requested to build a topology.
Building and Optimizing Topology¶
Topology build() // (1)
Topology build(
Properties props)
- Uses undefined properties (
null
)
build
requests the InternalStreamsBuilder to build and optimize a topology. In the end, build
returns the Topology.
globalTable¶
GlobalKTable<K, V> globalTable(
String topic)
GlobalKTable<K, V> globalTable(
String topic,
Consumed<K, V> consumed)
GlobalKTable<K, V> globalTable(
String topic,
Consumed<K, V> consumed,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
GlobalKTable<K, V> globalTable(
String topic,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
globalTable
adds a GlobalKTable to a topology.
Demo: Non-queryable GlobalKTable¶
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import serialization.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
val globalTable = builder.globalTable[String, String](topic = "demo-global-table")
scala> :type globalTable
org.apache.kafka.streams.kstream.GlobalKTable[String,String]
assert(globalTable.queryableStoreName == null)
val topology = builder.build()
scala> println(topology.describe)
Topologies:
Sub-topology: 0 for global store (will not generate tasks)
Source: KSTREAM-SOURCE-0000000001 (topics: [demo-global-table])
--> KTABLE-SOURCE-0000000002
Processor: KTABLE-SOURCE-0000000002 (stores: [demo-global-table-STATE-STORE-0000000000])
--> none
<-- KSTREAM-SOURCE-0000000001
Demo: Queryable GlobalKTable¶
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import serialization.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
import org.apache.kafka.streams.state.Stores
val supplier = Stores.inMemoryKeyValueStore("queryable-store-name")
import org.apache.kafka.streams.scala.kstream.Materialized
val materialized = Materialized.as[String, String](supplier)
val zipCodes = builder.globalTable[String, String](topic = "zip-codes", materialized)
scala> :type zipCodes
org.apache.kafka.streams.kstream.GlobalKTable[String,String]
assert(zipCodes.queryableStoreName == "queryable-store-name")
val topology = builder.build()
scala> println(topology.describe)
Topologies:
Sub-topology: 0 for global store (will not generate tasks)
Source: KSTREAM-SOURCE-0000000000 (topics: [zip-codes])
--> KTABLE-SOURCE-0000000001
Processor: KTABLE-SOURCE-0000000001 (stores: [queryable-store-name])
--> none
<-- KSTREAM-SOURCE-0000000000
stream¶
KStream<K, V> stream(
Collection<String> topics)
KStream<K, V> stream(
Collection<String> topics,
Consumed<K, V> consumed)
KStream<K, V> stream(
Pattern topicPattern)
KStream<K, V> stream(
Pattern topicPattern,
Consumed<K, V> consumed)
KStream<K, V> stream(
String topic)
KStream<K, V> stream(
String topic,
Consumed<K, V> consumed)
stream
requests the InternalStreamsBuilder to stream.
Demo: Custom Processor Name¶
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import ImplicitConversions._
import serialization.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
implicit val consumed = Consumed.`with`[String, String].withName("processorName")
val demo = builder.stream[String, String]("demo")
scala> println(builder.build().describe)
Topologies:
Sub-topology: 0
Source: processorName (topics: [demo])
--> none