Skip to content

KafkaStreams

KafkaStreams is the execution environment of a Kafka Streams application.

KafkaStreams is a Kafka client for continuous stream processing (on input coming from one or more input topics and sending output to zero, one, or more output topics).

Creating Instance

KafkaStreams takes the following to be created:

When created, KafkaStreams requests the given InternalTopologyBuilder to rewriteTopology followed by building a task and global task topologies.

KafkaStreams then...FIXME

defaultStreamsUncaughtExceptionHandler

void defaultStreamsUncaughtExceptionHandler(
  Throwable throwable)

defaultStreamsUncaughtExceptionHandler...FIXME

Task Topology

KafkaStreams requests the InternalTopologyBuilder to build a task topology when created.

The ProcessorTopology can have persistent local stores.

Global Task Topology

When created KafkaStreams requests the InternalTopologyBuilder to build a global task topology.

StreamThreads

KafkaStreams manages StreamThreads in a threads internal registry.

The threads collection starts empty when KafkaStreams is created.

KafkaStreams adds a new StreamThread when requested to createAndAddStreamThread.

A StreamThread is removed when KafkaStreams is requested for the following:

KafkaStreams uses processStreamThread to work with the StreamThreads.

processStreamThread

void processStreamThread(
  java.util.function.Consumer<StreamThread> consumer)

processStreamThread...FIXME

getNumLiveStreamThreads

int getNumLiveStreamThreads()

getNumLiveStreamThreads...FIXME

GlobalStreamThread

KafkaStreams can use a GlobalStreamThread if...FIXME

Starting Streams Client

void start()

start attempts to enter REBALANCING state and, if successful, prints out the following INFO message to the logs:

State transition from [oldState] to REBALANCING

start prints out the following DEBUG message to the logs:

Starting Streams client

start requests the GlobalStreamThread to start (if defined).

start requests all the StreamThreads to start.

start...FIXME

setUncaughtExceptionHandler

void setUncaughtExceptionHandler(
  StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)

setUncaughtExceptionHandler...FIXME

setUncaughtExceptionHandler is part of the public API.

handleStreamsUncaughtException

void handleStreamsUncaughtException(
  Throwable throwable,
  StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)

handleStreamsUncaughtException...FIXME

handleStreamsUncaughtException is used when:

replaceStreamThread

void replaceStreamThread(
  Throwable throwable)

replaceStreamThread...FIXME

addStreamThread

Optional<String> addStreamThread()

addStreamThread...FIXME

addStreamThread is part of the public API.

createAndAddStreamThread

StreamThread createAndAddStreamThread(
  long cacheSizePerThread,
  int threadIdx)

createAndAddStreamThread creates a StreamThread and requests it to setStateListener with the StreamStateListener.

createAndAddStreamThread registers the StreamThread (in the threads and threadState internal registries).

createAndAddStreamThread requests the QueryableStoreProvider to addStoreProviderForThread (with the name of the StreamThread and a new StreamThreadStateStoreProvider).

createAndAddStreamThread is used when:

Logging

Enable ALL logging level for org.apache.kafka.streams.KafkaStreams logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.KafkaStreams=ALL

Refer to Logging.

Back to top