KafkaStreams¶
KafkaStreams
is the execution environment of a single instance of a Kafka Streams application (KafkaStreams instance).
KafkaStreams
is a Kafka client for continuous stream processing (on input coming from one or more input topics and sending output to zero, one, or more output topics).
Creating Instance¶
KafkaStreams
takes the following to be created:
- InternalTopologyBuilder (or Topology)
- StreamsConfig
- KafkaClientSupplier (default:
DefaultKafkaClientSupplier
) -
Time
When created, KafkaStreams
requests the given InternalTopologyBuilder to rewriteTopology followed by building a task and global task topologies.
KafkaStreams
then...FIXME
defaultStreamsUncaughtExceptionHandler¶
void defaultStreamsUncaughtExceptionHandler(
Throwable throwable)
defaultStreamsUncaughtExceptionHandler
...FIXME
Task Topology¶
KafkaStreams
requests the InternalTopologyBuilder to build a task topology when created.
The ProcessorTopology can have persistent local stores.
Global Task Topology¶
When created KafkaStreams
requests the InternalTopologyBuilder to build a global task topology.
StreamThreads¶
KafkaStreams
manages StreamThreads in a threads
internal registry.
The threads
collection starts empty when KafkaStreams
is created.
KafkaStreams
adds a new StreamThread
when requested to createAndAddStreamThread.
A StreamThread
is removed when KafkaStreams
is requested for the following:
- defaultStreamsUncaughtExceptionHandler
- addStreamThread
- removeStreamThread
- getNumLiveStreamThreads
- getNextThreadIndex
KafkaStreams
uses processStreamThread to work with the StreamThread
s.
processStreamThread¶
void processStreamThread(
java.util.function.Consumer<StreamThread> consumer)
processStreamThread
...FIXME
getNumLiveStreamThreads¶
int getNumLiveStreamThreads()
getNumLiveStreamThreads
...FIXME
GlobalStreamThread¶
KafkaStreams
can use a GlobalStreamThread if...FIXME
Starting Streams Client¶
void start()
start
attempts to enter REBALANCING
state and, if successful, prints out the following INFO message to the logs:
State transition from [oldState] to REBALANCING
start
prints out the following DEBUG message to the logs:
Starting Streams client
start
requests the GlobalStreamThread to start (if defined).
start
requests all the StreamThreads to start.
start
...FIXME
setUncaughtExceptionHandler¶
void setUncaughtExceptionHandler(
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)
setUncaughtExceptionHandler
...FIXME
setUncaughtExceptionHandler
is part of the public API.
handleStreamsUncaughtException¶
void handleStreamsUncaughtException(
Throwable throwable,
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)
handleStreamsUncaughtException
...FIXME
handleStreamsUncaughtException
is used when:
KafkaStreams
is requested to setUncaughtExceptionHandler and defaultStreamsUncaughtExceptionHandler
replaceStreamThread¶
void replaceStreamThread(
Throwable throwable)
replaceStreamThread
...FIXME
addStreamThread¶
Optional<String> addStreamThread()
addStreamThread
...FIXME
addStreamThread
is part of the public API.
createAndAddStreamThread¶
StreamThread createAndAddStreamThread(
long cacheSizePerThread,
int threadIdx)
createAndAddStreamThread
creates a StreamThread and requests it to setStateListener with the StreamStateListener.
createAndAddStreamThread
registers the StreamThread
(in the threads and threadState internal registries).
createAndAddStreamThread
requests the QueryableStoreProvider to addStoreProviderForThread (with the name of the StreamThread
and a new StreamThreadStateStoreProvider
).
createAndAddStreamThread
is used when:
KafkaStreams
is created and requested to addStreamThread
StreamsMetadataState¶
KafkaStreams
creates a new StreamsMetadataState when created (with the endpoint based on application.server configuration property).
The StreamsMetadataState
is used to create a StreamThread and for the following state-related metadata operators:
queryMetadataForKey¶
KeyQueryMetadata queryMetadataForKey(
String storeName,
K key,
Serializer<K> keySerializer)
KeyQueryMetadata queryMetadataForKey(
String storeName,
K key,
StreamPartitioner<? super K, ?> partitioner)
queryMetadataForKey
requests the StreamsMetadataState to getKeyQueryMetadataForKey.
Performance Metrics¶
KafkaStreams
gets the configured metrics when created.
KafkaStreams
uses the metrics to create a StreamsMetricsImpl right after.
Metrics Recording Service¶
KafkaStreams
may create a rocksDBMetricsRecordingService
executor service (ScheduledExecutorService) when created (and the value of metrics.recording.level configuration property is DEBUG
).
KafkaStreams
uses the ScheduledExecutorService
to submit a RocksDBMetricsRecordingTrigger (of the StreamsMetricsImpl) to be executed every 1 minute (non-configurable).
The ScheduledExecutorService
is shut down when shutdownHelper.
maybeCreateRocksDBMetricsRecordingService¶
ScheduledExecutorService maybeCreateRocksDBMetricsRecordingService(
String clientId,
StreamsConfig config)
Only with metrics.recording.level configuration property as DEBUG
, maybeCreateRocksDBMetricsRecordingService
creates a single-threaded executor. The name of this one daemon thread is as follows:
[clientId]-RocksDBMetricsRecordingTrigger
StreamsMetricsImpl¶
When created, KafkaStreams
creates a StreamsMetricsImpl (with the Metrics, the clientId and built.in.metrics.version configuration property).
KafkaStreams
registers ClientMetrics.
KafkaStreams
passes the StreamsMetricsImpl
in while creating a GlobalStreamThread and StreamThreads.
When started, KafkaStreams
requests the StreamsMetricsImpl
for rocksDBMetricsRecordingTrigger (to schedule it at fixed rate using the Metrics Recording Service).
getMetrics¶
Metrics getMetrics(
StreamsConfig config,
Time time,
String clientId)
getMetrics
creates a MetricConfig
(Apache Kafka) based on the following configuration properties:
getMetrics
requests the given StreamsConfig for configured MetricsReporter
s (Apache Kafka) per metric.reporters configuration property.
getMetrics
always adds JmxReporter
to the list of configured MetricsReporter
s. JmxReporter
is configured to use kafka.streams
JMX prefix.
In the end, getMetrics
creates a Metrics
(Apache Kafka) (with the MetricConfig
, the MetricsReporter
s, et al.)
cache.max.bytes.buffering¶
KafkaStreams
reads cache.max.bytes.buffering when created for getCacheSizePerThread.
getCacheSizePerThread¶
long getCacheSizePerThread(
int numStreamThreads)
getCacheSizePerThread
returns the totalCacheSize when the given numStreamThreads
is 0
. Otherwise, getCacheSizePerThread
is the totalCacheSize divided by the given numStreamThreads
with an extra 1
for the globalTaskTopology thread (if used).
getCacheSizePerThread
is used when:
KafkaStreams
is created, is requested to addStreamThread and removeStreamThread.
Logging¶
Enable ALL
logging level for org.apache.kafka.streams.KafkaStreams
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.org.apache.kafka.streams.KafkaStreams=ALL
Refer to Logging.