KStream API — Stream of Records¶
KStream<K, V>
is an abstraction of a stream of records (of key-value pairs).
KStream
can be created directly from one or many Kafka topics (using StreamsBuilder.stream operators) or as a result of transformations on an existing KStream
instance.
KStream
offers a rich set of operators (KStream API) for building topologies to consume, process and produce (key-value) records.
Contract (Subset)¶
flatMap¶
KStream<KR, VR> flatMap(
KeyValueMapper<
? super K,
? super V,
? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper)
KStream<KR, VR> flatMap(
KeyValueMapper<
? super K,
? super V,
? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper,
Named named)
foreach¶
void foreach(
ForeachAction<? super K, ? super V> action)
void foreach(
ForeachAction<? super K, ? super V> action,
Named named)
groupBy¶
KGroupedStream<KR, V> groupBy(
KeyValueMapper<? super K, ? super V, KR> keySelector)
KGroupedStream<KR, V> groupBy(
KeyValueMapper<? super K, ? super V, KR> keySelector,
Grouped<KR, V> grouped)
groupByKey¶
KGroupedStream<K, V> groupByKey()
KGroupedStream<K, V> groupByKey(
Grouped<K, V> grouped)
join¶
KStream<K, RV> join(
GlobalKTable<GK, GV> globalTable,
KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
ValueJoiner<? super V, ? super GV, ? extends RV> joiner)
KStream<K, RV> join(
GlobalKTable<GK, GV> globalTable,
KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
ValueJoiner<? super V, ? super GV, ? extends RV> joiner,
Named named)
KStream<K, RV> join(
GlobalKTable<GK, GV> globalTable,
KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> joiner)
KStream<K, RV> join(
GlobalKTable<GK, GV> globalTable,
KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> joiner,
Named named)
KStream<K, VR> join(
KStream<K, VO> otherStream,
ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
JoinWindows windows)
KStream<K, VR> join(
KStream<K, VO> otherStream,
ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
JoinWindows windows,
StreamJoined<K, V, VO> streamJoined)
KStream<K, VR> join(
KStream<K, VO> otherStream,
ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
JoinWindows windows)
KStream<K, VR> join(
KStream<K, VO> otherStream,
ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
JoinWindows windows,
StreamJoined<K, V, VO> streamJoined)
KStream<K, VR> join(
KTable<K, VT> table,
ValueJoiner<? super V, ? super VT, ? extends VR> joiner)
KStream<K, VR> join(
KTable<K, VT> table,
ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
Joined<K, V, VT> joined)
KStream<K, VR> join(
KTable<K, VT> table,
ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner)
KStream<K, VR> join(
KTable<K, VT> table,
ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner,
Joined<K, V, VT> joined)
Merging KStreams¶
KStream<K, V> merge(
KStream<K, V> stream,
Named named)
... // (1)
- There are others. Let's focus on the most important parts.
peek¶
KStream<K, V> peek(
ForeachAction<? super K, ? super V> action)
KStream<K, V> peek(
ForeachAction<? super K, ? super V> action,
Named named)
print¶
void print(
Printed<K, V> printed)
process¶
void process(
ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
Named named,
String... stateStoreNames)
void process(
ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
String... stateStoreNames)
repartition¶
KStream<K, V> repartition()
KStream<K, V> repartition(
Repartitioned<K, V> repartitioned)
split¶
BranchedKStream<K, V> split()
BranchedKStream<K, V> split(
Named named)
to¶
void to(
String topic)
void to(
String topic,
Produced<K, V> produced)
void to(
TopicNameExtractor<K, V> topicExtractor)
void to(
TopicNameExtractor<K, V> topicExtractor,
Produced<K, V> produced)
toTable¶
KTable<K, V> toTable()
KTable<K, V> toTable(
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
KTable<K, V> toTable(
Named named)
KTable<K, V> toTable(
Named named,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
transform¶
KStream<K1, V1> transform(
TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
Named named,
String... stateStoreNames)
KStream<K1, V1> transform(
TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
String... stateStoreNames)
Implementations¶
Demo¶
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import serialization.Serdes._
val builder = new StreamsBuilder
// Use type annotation to describe the stream, i.e. stream[String, String]
// Else...Scala type inferencer gives us a stream of "nothing", i.e. KStream[Nothing, Nothing]
val input = builder.stream[String, String]("input")
scala> :type input
org.apache.kafka.streams.scala.kstream.KStream[String,String]