Skip to content


KafkaStreams is the execution environment of a single instance of a Kafka Streams application (KafkaStreams instance).

KafkaStreams is a Kafka client for continuous stream processing (on input coming from one or more input topics and sending output to zero, one, or more output topics).

Creating Instance

KafkaStreams takes the following to be created:

When created, KafkaStreams requests the given InternalTopologyBuilder to rewriteTopology followed by building a task and global task topologies.

KafkaStreams then...FIXME


void defaultStreamsUncaughtExceptionHandler(
  Throwable throwable)


Task Topology

KafkaStreams requests the InternalTopologyBuilder to build a task topology when created.

The ProcessorTopology can have persistent local stores.

Global Task Topology

When created KafkaStreams requests the InternalTopologyBuilder to build a global task topology.


KafkaStreams manages StreamThreads in a threads internal registry.

The threads collection starts empty when KafkaStreams is created.

KafkaStreams adds a new StreamThread when requested to createAndAddStreamThread.

A StreamThread is removed when KafkaStreams is requested for the following:

KafkaStreams uses processStreamThread to work with the StreamThreads.


void processStreamThread(
  java.util.function.Consumer<StreamThread> consumer)



int getNumLiveStreamThreads()



KafkaStreams can use a GlobalStreamThread if...FIXME

Starting Streams Client

void start()

start attempts to enter REBALANCING state and, if successful, prints out the following INFO message to the logs:

State transition from [oldState] to REBALANCING

start prints out the following DEBUG message to the logs:

Starting Streams client

start requests the GlobalStreamThread to start (if defined).

start requests all the StreamThreads to start.



void setUncaughtExceptionHandler(
  StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)


setUncaughtExceptionHandler is part of the public API.


void handleStreamsUncaughtException(
  Throwable throwable,
  StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)


handleStreamsUncaughtException is used when:


void replaceStreamThread(
  Throwable throwable)



Optional<String> addStreamThread()


addStreamThread is part of the public API.


StreamThread createAndAddStreamThread(
  long cacheSizePerThread,
  int threadIdx)

createAndAddStreamThread creates a StreamThread and requests it to setStateListener with the StreamStateListener.

createAndAddStreamThread registers the StreamThread (in the threads and threadState internal registries).

createAndAddStreamThread requests the QueryableStoreProvider to addStoreProviderForThread (with the name of the StreamThread and a new StreamThreadStateStoreProvider).

createAndAddStreamThread is used when:


KafkaStreams creates a new StreamsMetadataState when created (with the endpoint based on application.server configuration property).

The StreamsMetadataState is used to create a StreamThread and for the following state-related metadata operators:


KeyQueryMetadata queryMetadataForKey(
  String storeName,
  K key,
  Serializer<K> keySerializer)
KeyQueryMetadata queryMetadataForKey(
  String storeName,
  K key,
  StreamPartitioner<? super K, ?> partitioner)

queryMetadataForKey requests the StreamsMetadataState to getKeyQueryMetadataForKey.

Performance Metrics

KafkaStreams gets the configured metrics when created.

KafkaStreams uses the metrics to create a StreamsMetricsImpl right after.

Metrics Recording Service

KafkaStreams may create a rocksDBMetricsRecordingService executor service (ScheduledExecutorService) when created (and the value of metrics.recording.level configuration property is DEBUG).

KafkaStreams uses the ScheduledExecutorService to submit a RocksDBMetricsRecordingTrigger (of the StreamsMetricsImpl) to be executed every 1 minute (non-configurable).

The ScheduledExecutorService is shut down when shutdownHelper.


ScheduledExecutorService maybeCreateRocksDBMetricsRecordingService(
  String clientId,
  StreamsConfig config)

Only with metrics.recording.level configuration property as DEBUG, maybeCreateRocksDBMetricsRecordingService creates a single-threaded executor. The name of this one daemon thread is as follows:



When created, KafkaStreams creates a StreamsMetricsImpl (with the Metrics, the clientId and configuration property).

KafkaStreams registers ClientMetrics.

KafkaStreams passes the StreamsMetricsImpl in while creating a GlobalStreamThread and StreamThreads.

When started, KafkaStreams requests the StreamsMetricsImpl for rocksDBMetricsRecordingTrigger (to schedule it at fixed rate using the Metrics Recording Service).


Metrics getMetrics(
  StreamsConfig config,
  Time time,
  String clientId)

getMetrics creates a MetricConfig (Apache Kafka) based on the following configuration properties:

getMetrics requests the given StreamsConfig for configured MetricsReporters (Apache Kafka) per metric.reporters configuration property.

getMetrics always adds JmxReporter to the list of configured MetricsReporters. JmxReporter is configured to use kafka.streams JMX prefix.

In the end, getMetrics creates a Metrics (Apache Kafka) (with the MetricConfig, the MetricsReporters, et al.)


KafkaStreams reads cache.max.bytes.buffering when created for getCacheSizePerThread.


long getCacheSizePerThread(
  int numStreamThreads)

getCacheSizePerThread returns the totalCacheSize when the given numStreamThreads is 0. Otherwise, getCacheSizePerThread is the totalCacheSize divided by the given numStreamThreads with an extra 1 for the globalTaskTopology thread (if used).

getCacheSizePerThread is used when:


Enable ALL logging level for org.apache.kafka.streams.KafkaStreams logger to see what happens inside.

Add the following line to

Refer to Logging.

Back to top