Skip to content

Stores

Stores utility is a factory for creating state stores in Kafka Streams.

inMemoryWindowStore

WindowBytesStoreSupplier inMemoryWindowStore(
  String name,
  Duration retentionPeriod,
  Duration windowSize,
  boolean retainDuplicates)

inMemoryWindowStore...FIXME

inMemoryWindowStore is used when:

  • KStreamImplJoin is requested to sharedOuterJoinWindowStoreBuilder (for left outer join)

keyValueStoreBuilder

StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(
  KeyValueBytesStoreSupplier supplier,
  Serde<K> keySerde,
  Serde<V> valueSerde)

keyValueStoreBuilder creates a KeyValueStoreBuilder (with the given arguments and Time.SYSTEM).

Demo

This demo uses the Processor API to add a StoreBuilder to a Topology. Once created with the Stores utility, the StoreBuilder is then attached to a Processor using Topology.addStateStore.

Create Topology

import org.apache.kafka.streams.Topology
val builder = new Topology()

Add Processor

val processorName = "my-custom-processor"
topology.addProcessor(processorName, ...);

Create StoreBuilder

import org.apache.kafka.streams.state.Stores
val storeBuilder = Stores.keyValueStoreBuilder(...)

Attach Processor to StateStore(Builder)

builder.addStateStore(storeBuilder, processorName)
Back to top