Skip to content

StreamThread

StreamThread is a Thread (Java).

Creating Instance

StreamThread takes the following to be created:

  • Time
  • StreamsConfig
  • Admin
  • Main Consumer<byte[], byte[]>
  • Restore Consumer<byte[], byte[]>
  • ChangelogReader
  • originalReset
  • TaskManager
  • StreamsMetricsImpl
  • InternalTopologyBuilder
  • Thread ID
  • LogContext
  • assignmentErrorCode
  • nextProbingRebalanceMs
  • Shutdown Error Hook
  • java.util.function.Consumer<Throwable>
  • java.util.function.Consumer<Long>

StreamThread is created using create utility.

commit.interval.ms

StreamThread uses commit.interval.ms configuration property to control whether to commit tasks or not.

Creating StreamThread

StreamThread create(
  InternalTopologyBuilder builder,
  StreamsConfig config,
  KafkaClientSupplier clientSupplier,
  Admin adminClient,
  UUID processId,
  String clientId,
  StreamsMetricsImpl streamsMetrics,
  Time time,
  StreamsMetadataState streamsMetadataState,
  long cacheSizeBytes,
  StateDirectory stateDirectory,
  StateRestoreListener userStateRestoreListener,
  int threadIdx,
  Runnable shutdownErrorHook,
  java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler)

create creates a new ReferenceContainer with the given arguments:

create prints out the following INFO message to the logs:

Creating restore consumer client

create requests the given StreamsConfig for the restore consumer configs (with getRestoreConsumerClientId) and requests the given KafkaClientSupplier for a restore consumer.

create creates a StoreChangelogReader.

create creates a ThreadCache (with the given cacheSizeBytes and StreamsMetricsImpl of the parent KafkaStreams).

create creates a ActiveTaskCreator, a StandbyTaskCreator and a TaskManager.

create prints out the following INFO message to the logs:

Creating consumer client

create...FIXME

create is used when:

Starting Execution

void run()

run...FIXME

run is part of the Thread (Java) abstraction.

runLoop

void runLoop()

runLoop...FIXME

runOnce

void runOnce()

runOnce records the start time and poll latency of the poll phase.

runOnce continues work only while in running state. Otherwise, runOnce prints out the following DEBUG message to the logs and returns:

Thread state is already [state], skipping the run once call after poll request

runOnce initializeAndRestorePhase and computes the latency.

Only when in RUNNING state, runOnce processes tasks (in iterations).

In the end, runOnce updates the Sensors and every 2 minutes (non-configurable) prints out the following INFO message to the logs:

Processed [totalRecordsProcessedSinceLastSummary] total records,
ran [totalPunctuatorsSinceLastSummary] punctuators,
and committed [totalCommittedSinceLastSummary] total tasks since the last update

Number of Iterations

StreamThread uses numIterations internal registry for the maximum number of iterations.

The numIterations starts as 1 when StreamThread is created and is updated (incremented or half'ed) at the end of every iteration (until active tasks processed no rows).

The numIterations is used as the maximum number of records for the TaskManager to process records.

numIterations is printed out twice to the logs at DEBUG level.

Processing Tasks (in Iterations)

When in RUNNING state, runOnce executes the following steps the maximum number of iterations.

runOnce...FIXME

runOnce requests the TaskManager to punctuate.

runOnce...FIXME

initializeAndRestorePhase

void initializeAndRestorePhase()

initializeAndRestorePhase...FIXME

initializeAndRestorePhase prints out the following DEBUG message to the logs:

Idempotently invoking restoration logic in state [state]

In the end, initializeAndRestorePhase requests the ChangelogReader to restore state stores and prints out the following DEBUG message to the logs:

Idempotent restore call done. Thread state has not changed.

maybeCommit

int maybeCommit()

maybeCommit checks out whether to commit active and standby tasks (based on the last commit time and commit.interval.ms).

If the last commit happened enough long ago, maybeCommit prints out the following DEBUG message to the logs:

Committing all active tasks [ids] and standby tasks [ids] since [time]ms has elapsed (commit interval is [time]ms)

maybeCommit requests the TaskManager to commit the tasks that are RUNNING or RESTORING.

If there were offsets committed, maybeCommit requests the TaskManager to maybePurgeCommittedRecords. Otherwise, maybeCommit prints out the following DEBUG message to the logs:

Unable to commit as we are in the middle of a rebalance, will try again when it completes.

If the last commit happened fairly recently, maybeCommit merely requests the TaskManager to maybeCommitActiveTasksPerUserRequested

Either way, in the end, maybeCommit returns the number of committed offsets.

Logging

Enable ALL logging level for org.apache.kafka.streams.processor.internals.StreamThread logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.StreamThread=ALL

Refer to Logging.

Back to top