KafkaStreams¶
KafkaStreams is the execution environment of a single instance of a Kafka Streams application (KafkaStreams instance).
KafkaStreams is a Kafka client for continuous stream processing (on input coming from one or more input topics and sending output to zero, one, or more output topics).
Creating Instance¶
KafkaStreams takes the following to be created:
- InternalTopologyBuilder (or Topology)
 - StreamsConfig
 -  KafkaClientSupplier (default: 
DefaultKafkaClientSupplier) -  
Time 
When created, KafkaStreams requests the given InternalTopologyBuilder to rewriteTopology followed by building a task and global task topologies.
KafkaStreams then...FIXME
defaultStreamsUncaughtExceptionHandler¶
void defaultStreamsUncaughtExceptionHandler(
  Throwable throwable)
defaultStreamsUncaughtExceptionHandler...FIXME
Task Topology¶
KafkaStreams requests the InternalTopologyBuilder to build a task topology when created.
The ProcessorTopology can have persistent local stores.
Global Task Topology¶
When created KafkaStreams requests the InternalTopologyBuilder to build a global task topology.
StreamThreads¶
KafkaStreams manages StreamThreads in a threads internal registry.
The threads collection starts empty when KafkaStreams is created.
KafkaStreams adds a new StreamThread when requested to createAndAddStreamThread.
A StreamThread is removed when KafkaStreams is requested for the following:
- defaultStreamsUncaughtExceptionHandler
 - addStreamThread
 - removeStreamThread
 - getNumLiveStreamThreads
 - getNextThreadIndex
 
KafkaStreams uses processStreamThread to work with the StreamThreads.
processStreamThread¶
void processStreamThread(
  java.util.function.Consumer<StreamThread> consumer)
processStreamThread...FIXME
getNumLiveStreamThreads¶
int getNumLiveStreamThreads()
getNumLiveStreamThreads...FIXME
GlobalStreamThread¶
KafkaStreams can use a GlobalStreamThread if...FIXME
Starting Streams Client¶
void start()
start attempts to enter REBALANCING state and, if successful, prints out the following INFO message to the logs:
State transition from [oldState] to REBALANCING
start prints out the following DEBUG message to the logs:
Starting Streams client
start requests the GlobalStreamThread to start (if defined).
start requests all the StreamThreads to start.
start...FIXME
setUncaughtExceptionHandler¶
void setUncaughtExceptionHandler(
  StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)
setUncaughtExceptionHandler...FIXME
setUncaughtExceptionHandler is part of the public API.
handleStreamsUncaughtException¶
void handleStreamsUncaughtException(
  Throwable throwable,
  StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)
handleStreamsUncaughtException...FIXME
handleStreamsUncaughtException is used when:
KafkaStreamsis requested to setUncaughtExceptionHandler and defaultStreamsUncaughtExceptionHandler
replaceStreamThread¶
void replaceStreamThread(
  Throwable throwable)
replaceStreamThread...FIXME
addStreamThread¶
Optional<String> addStreamThread()
addStreamThread...FIXME
addStreamThread is part of the public API.
createAndAddStreamThread¶
StreamThread createAndAddStreamThread(
  long cacheSizePerThread,
  int threadIdx)
createAndAddStreamThread creates a StreamThread and requests it to setStateListener with the StreamStateListener.
createAndAddStreamThread registers the StreamThread (in the threads and threadState internal registries).
createAndAddStreamThread requests the QueryableStoreProvider to addStoreProviderForThread (with the name of the StreamThread and a new StreamThreadStateStoreProvider).
createAndAddStreamThread is used when:
KafkaStreamsis created and requested to addStreamThread
StreamsMetadataState¶
KafkaStreams creates a new StreamsMetadataState when created (with the endpoint based on application.server configuration property).
The StreamsMetadataState is used to create a StreamThread and for the following state-related metadata operators:
queryMetadataForKey¶
KeyQueryMetadata queryMetadataForKey(
  String storeName,
  K key,
  Serializer<K> keySerializer)
KeyQueryMetadata queryMetadataForKey(
  String storeName,
  K key,
  StreamPartitioner<? super K, ?> partitioner)
queryMetadataForKey requests the StreamsMetadataState to getKeyQueryMetadataForKey.
Performance Metrics¶
KafkaStreams gets the configured metrics when created.
KafkaStreams uses the metrics to create a StreamsMetricsImpl right after.
Metrics Recording Service¶
KafkaStreams may create a rocksDBMetricsRecordingService executor service (ScheduledExecutorService) when created (and the value of metrics.recording.level configuration property is DEBUG).
KafkaStreams uses the ScheduledExecutorService to submit a RocksDBMetricsRecordingTrigger (of the StreamsMetricsImpl) to be executed every 1 minute (non-configurable).
The ScheduledExecutorService is shut down when shutdownHelper.
maybeCreateRocksDBMetricsRecordingService¶
ScheduledExecutorService maybeCreateRocksDBMetricsRecordingService(
  String clientId,
  StreamsConfig config)
Only with metrics.recording.level configuration property as DEBUG, maybeCreateRocksDBMetricsRecordingService creates a single-threaded executor. The name of this one daemon thread is as follows:
[clientId]-RocksDBMetricsRecordingTrigger
StreamsMetricsImpl¶
When created, KafkaStreams creates a StreamsMetricsImpl (with the Metrics, the clientId and built.in.metrics.version configuration property).
KafkaStreams registers ClientMetrics.
KafkaStreams passes the StreamsMetricsImpl in while creating a GlobalStreamThread and StreamThreads.
When started, KafkaStreams requests the StreamsMetricsImpl for rocksDBMetricsRecordingTrigger (to schedule it at fixed rate using the Metrics Recording Service).
getMetrics¶
Metrics getMetrics(
  StreamsConfig config,
  Time time,
  String clientId)
getMetrics creates a MetricConfig (Apache Kafka) based on the following configuration properties:
getMetrics requests the given StreamsConfig for configured MetricsReporters (Apache Kafka) per metric.reporters configuration property.
getMetrics always adds JmxReporter to the list of configured MetricsReporters. JmxReporter is configured to use kafka.streams JMX prefix.
In the end, getMetrics creates a Metrics (Apache Kafka) (with the MetricConfig, the MetricsReporters, et al.)
cache.max.bytes.buffering¶
KafkaStreams reads cache.max.bytes.buffering when created for getCacheSizePerThread.
getCacheSizePerThread¶
long getCacheSizePerThread(
  int numStreamThreads)
getCacheSizePerThread returns the totalCacheSize when the given numStreamThreads is 0. Otherwise, getCacheSizePerThread is the totalCacheSize divided by the given numStreamThreads with an extra 1 for the globalTaskTopology thread (if used).
getCacheSizePerThread is used when:
KafkaStreamsis created, is requested to addStreamThread and removeStreamThread.
Logging¶
Enable ALL logging level for org.apache.kafka.streams.KafkaStreams logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.org.apache.kafka.streams.KafkaStreams=ALL
Refer to Logging.