StreamThread¶
StreamThread
is a Thread
(Java).
Creating Instance¶
StreamThread
takes the following to be created:
-
Time
- StreamsConfig
-
Admin
- Main
Consumer<byte[], byte[]>
- Restore
Consumer<byte[], byte[]>
- ChangelogReader
-
originalReset
- TaskManager
- StreamsMetricsImpl
-
InternalTopologyBuilder
- Thread ID
-
LogContext
-
assignmentErrorCode
-
nextProbingRebalanceMs
- Shutdown Error Hook
-
java.util.function.Consumer<Throwable>
-
java.util.function.Consumer<Long>
StreamThread
is created using create utility.
commit.interval.ms¶
StreamThread
uses commit.interval.ms configuration property to control whether to commit tasks or not.
Creating StreamThread¶
StreamThread create(
InternalTopologyBuilder builder,
StreamsConfig config,
KafkaClientSupplier clientSupplier,
Admin adminClient,
UUID processId,
String clientId,
StreamsMetricsImpl streamsMetrics,
Time time,
StreamsMetadataState streamsMetadataState,
long cacheSizeBytes,
StateDirectory stateDirectory,
StateRestoreListener userStateRestoreListener,
int threadIdx,
Runnable shutdownErrorHook,
java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler)
create
creates a new ReferenceContainer with the given arguments:
Admin
client (Apache Kafka)- StreamsMetadataState
Time
create
prints out the following INFO message to the logs:
Creating restore consumer client
create
requests the given StreamsConfig
for the restore consumer configs (with getRestoreConsumerClientId) and requests the given KafkaClientSupplier for a restore consumer.
create
creates a StoreChangelogReader.
create
creates a ThreadCache (with the given cacheSizeBytes
and StreamsMetricsImpl of the parent KafkaStreams
).
create
creates a ActiveTaskCreator, a StandbyTaskCreator and a TaskManager.
create
prints out the following INFO message to the logs:
Creating consumer client
create
...FIXME
create
is used when:
KafkaStreams
is requested to createAndAddStreamThread
Starting Execution¶
void run()
run
...FIXME
run
is part of the Thread
(Java) abstraction.
runLoop¶
void runLoop()
runLoop
...FIXME
runOnce¶
void runOnce()
runOnce
records the start time and poll latency of the poll phase.
runOnce
continues work only while in running state. Otherwise, runOnce
prints out the following DEBUG message to the logs and returns:
Thread state is already [state], skipping the run once call after poll request
runOnce
initializeAndRestorePhase and computes the latency.
Only when in RUNNING
state, runOnce
processes tasks (in iterations).
In the end, runOnce
updates the Sensor
s and every 2 minutes (non-configurable) prints out the following INFO message to the logs:
Processed [totalRecordsProcessedSinceLastSummary] total records,
ran [totalPunctuatorsSinceLastSummary] punctuators,
and committed [totalCommittedSinceLastSummary] total tasks since the last update
Number of Iterations¶
StreamThread
uses numIterations
internal registry for the maximum number of iterations.
The numIterations
starts as 1
when StreamThread
is created and is updated (incremented or half'ed) at the end of every iteration (until active tasks processed no rows).
The numIterations
is used as the maximum number of records for the TaskManager to process records.
numIterations
is printed out twice to the logs at DEBUG level.
Processing Tasks (in Iterations)¶
When in RUNNING
state, runOnce
executes the following steps the maximum number of iterations.
runOnce
...FIXME
runOnce
requests the TaskManager to punctuate.
runOnce
...FIXME
initializeAndRestorePhase¶
void initializeAndRestorePhase()
initializeAndRestorePhase
...FIXME
initializeAndRestorePhase
prints out the following DEBUG message to the logs:
Idempotently invoking restoration logic in state [state]
In the end, initializeAndRestorePhase
requests the ChangelogReader to restore state stores and prints out the following DEBUG message to the logs:
Idempotent restore call done. Thread state has not changed.
maybeCommit¶
int maybeCommit()
maybeCommit
checks out whether to commit active and standby tasks (based on the last commit time and commit.interval.ms).
If the last commit happened enough long ago, maybeCommit
prints out the following DEBUG message to the logs:
Committing all active tasks [ids] and standby tasks [ids] since [time]ms has elapsed (commit interval is [time]ms)
maybeCommit
requests the TaskManager to commit the tasks that are RUNNING
or RESTORING
.
If there were offsets committed, maybeCommit
requests the TaskManager to maybePurgeCommittedRecords. Otherwise, maybeCommit
prints out the following DEBUG message to the logs:
Unable to commit as we are in the middle of a rebalance, will try again when it completes.
If the last commit happened fairly recently, maybeCommit
merely requests the TaskManager to maybeCommitActiveTasksPerUserRequested
Either way, in the end, maybeCommit
returns the number of committed offsets.
Logging¶
Enable ALL
logging level for org.apache.kafka.streams.processor.internals.StreamThread
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.org.apache.kafka.streams.processor.internals.StreamThread=ALL
Refer to Logging.