Skip to content

KafkaStreams

KafkaStreams is the execution environment of a single instance of a Kafka Streams application (KafkaStreams instance).

KafkaStreams is a Kafka client for continuous stream processing (on input coming from one or more input topics and sending output to zero, one, or more output topics).

Creating Instance

KafkaStreams takes the following to be created:

When created, KafkaStreams requests the given InternalTopologyBuilder to rewriteTopology followed by building a task and global task topologies.

KafkaStreams then...FIXME

defaultStreamsUncaughtExceptionHandler

void defaultStreamsUncaughtExceptionHandler(
  Throwable throwable)

defaultStreamsUncaughtExceptionHandler...FIXME

Task Topology

KafkaStreams requests the InternalTopologyBuilder to build a task topology when created.

The ProcessorTopology can have persistent local stores.

Global Task Topology

When created KafkaStreams requests the InternalTopologyBuilder to build a global task topology.

StreamThreads

KafkaStreams manages StreamThreads in a threads internal registry.

The threads collection starts empty when KafkaStreams is created.

KafkaStreams adds a new StreamThread when requested to createAndAddStreamThread.

A StreamThread is removed when KafkaStreams is requested for the following:

KafkaStreams uses processStreamThread to work with the StreamThreads.

processStreamThread

void processStreamThread(
  java.util.function.Consumer<StreamThread> consumer)

processStreamThread...FIXME

getNumLiveStreamThreads

int getNumLiveStreamThreads()

getNumLiveStreamThreads...FIXME

GlobalStreamThread

KafkaStreams can use a GlobalStreamThread if...FIXME

Starting Streams Client

void start()

start attempts to enter REBALANCING state and, if successful, prints out the following INFO message to the logs:

State transition from [oldState] to REBALANCING

start prints out the following DEBUG message to the logs:

Starting Streams client

start requests the GlobalStreamThread to start (if defined).

start requests all the StreamThreads to start.

start...FIXME

setUncaughtExceptionHandler

void setUncaughtExceptionHandler(
  StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)

setUncaughtExceptionHandler...FIXME

setUncaughtExceptionHandler is part of the public API.

handleStreamsUncaughtException

void handleStreamsUncaughtException(
  Throwable throwable,
  StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)

handleStreamsUncaughtException...FIXME

handleStreamsUncaughtException is used when:

replaceStreamThread

void replaceStreamThread(
  Throwable throwable)

replaceStreamThread...FIXME

addStreamThread

Optional<String> addStreamThread()

addStreamThread...FIXME

addStreamThread is part of the public API.

createAndAddStreamThread

StreamThread createAndAddStreamThread(
  long cacheSizePerThread,
  int threadIdx)

createAndAddStreamThread creates a StreamThread and requests it to setStateListener with the StreamStateListener.

createAndAddStreamThread registers the StreamThread (in the threads and threadState internal registries).

createAndAddStreamThread requests the QueryableStoreProvider to addStoreProviderForThread (with the name of the StreamThread and a new StreamThreadStateStoreProvider).

createAndAddStreamThread is used when:

StreamsMetadataState

KafkaStreams creates a new StreamsMetadataState when created (with the endpoint based on application.server configuration property).

The StreamsMetadataState is used to create a StreamThread and for the following state-related metadata operators:

queryMetadataForKey

KeyQueryMetadata queryMetadataForKey(
  String storeName,
  K key,
  Serializer<K> keySerializer)
KeyQueryMetadata queryMetadataForKey(
  String storeName,
  K key,
  StreamPartitioner<? super K, ?> partitioner)

queryMetadataForKey requests the StreamsMetadataState to getKeyQueryMetadataForKey.

Performance Metrics

KafkaStreams gets the configured metrics when created.

KafkaStreams uses the metrics to create a StreamsMetricsImpl right after.

Metrics Recording Service

KafkaStreams may create a rocksDBMetricsRecordingService executor service (ScheduledExecutorService) when created (and the value of metrics.recording.level configuration property is DEBUG).

KafkaStreams uses the ScheduledExecutorService to submit a RocksDBMetricsRecordingTrigger (of the StreamsMetricsImpl) to be executed every 1 minute (non-configurable).

The ScheduledExecutorService is shut down when shutdownHelper.

maybeCreateRocksDBMetricsRecordingService

ScheduledExecutorService maybeCreateRocksDBMetricsRecordingService(
  String clientId,
  StreamsConfig config)

Only with metrics.recording.level configuration property as DEBUG, maybeCreateRocksDBMetricsRecordingService creates a single-threaded executor. The name of this one daemon thread is as follows:

[clientId]-RocksDBMetricsRecordingTrigger

StreamsMetricsImpl

When created, KafkaStreams creates a StreamsMetricsImpl (with the Metrics, the clientId and built.in.metrics.version configuration property).

KafkaStreams registers ClientMetrics.

KafkaStreams passes the StreamsMetricsImpl in while creating a GlobalStreamThread and StreamThreads.

When started, KafkaStreams requests the StreamsMetricsImpl for rocksDBMetricsRecordingTrigger (to schedule it at fixed rate using the Metrics Recording Service).

getMetrics

Metrics getMetrics(
  StreamsConfig config,
  Time time,
  String clientId)

getMetrics creates a MetricConfig (Apache Kafka) based on the following configuration properties:

getMetrics requests the given StreamsConfig for configured MetricsReporters (Apache Kafka) per metric.reporters configuration property.

getMetrics always adds JmxReporter to the list of configured MetricsReporters. JmxReporter is configured to use kafka.streams JMX prefix.

In the end, getMetrics creates a Metrics (Apache Kafka) (with the MetricConfig, the MetricsReporters, et al.)

cache.max.bytes.buffering

KafkaStreams reads cache.max.bytes.buffering when created for getCacheSizePerThread.

getCacheSizePerThread

long getCacheSizePerThread(
  int numStreamThreads)

getCacheSizePerThread returns the totalCacheSize when the given numStreamThreads is 0. Otherwise, getCacheSizePerThread is the totalCacheSize divided by the given numStreamThreads with an extra 1 for the globalTaskTopology thread (if used).

getCacheSizePerThread is used when:

Logging

Enable ALL logging level for org.apache.kafka.streams.KafkaStreams logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.KafkaStreams=ALL

Refer to Logging.

Back to top