Stores¶
Stores
utility is a factory for creating state stores in Kafka Streams.
inMemoryWindowStore¶
WindowBytesStoreSupplier inMemoryWindowStore(
String name,
Duration retentionPeriod,
Duration windowSize,
boolean retainDuplicates)
inMemoryWindowStore
...FIXME
inMemoryWindowStore
is used when:
KStreamImplJoin
is requested tosharedOuterJoinWindowStoreBuilder
(for left outer join)
keyValueStoreBuilder¶
StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(
KeyValueBytesStoreSupplier supplier,
Serde<K> keySerde,
Serde<V> valueSerde)
keyValueStoreBuilder
creates a KeyValueStoreBuilder (with the given arguments and Time.SYSTEM
).
Demo¶
This demo uses the Processor API to add a StoreBuilder to a Topology. Once created with the Stores utility, the StoreBuilder
is then attached to a Processor using Topology.addStateStore.
Create Topology¶
import org.apache.kafka.streams.Topology
val builder = new Topology()
Add Processor¶
val processorName = "my-custom-processor"
topology.addProcessor(processorName, ...);
Create StoreBuilder¶
import org.apache.kafka.streams.state.Stores
val storeBuilder = Stores.keyValueStoreBuilder(...)
Attach Processor to StateStore(Builder)¶
builder.addStateStore(storeBuilder, processorName)