Skip to content

StreamThread

StreamThread is a Thread (Java).

Creating Instance

StreamThread takes the following to be created:

  • Time
  • StreamsConfig
  • Admin
  • Main Consumer<byte[], byte[]>
  • Restore Consumer<byte[], byte[]>
  • ChangelogReader
  • originalReset
  • TaskManager
  • StreamsMetricsImpl
  • InternalTopologyBuilder
  • Thread ID
  • LogContext
  • assignmentErrorCode
  • nextProbingRebalanceMs
  • Shutdown Error Hook
  • java.util.function.Consumer<Throwable>
  • java.util.function.Consumer<Long>

StreamThread is created using create utility.

commit.interval.ms

StreamThread uses commit.interval.ms configuration property to control whether to commit tasks or not.

Creating StreamThread

StreamThread create(
  InternalTopologyBuilder builder,
  StreamsConfig config,
  KafkaClientSupplier clientSupplier,
  Admin adminClient,
  UUID processId,
  String clientId,
  StreamsMetricsImpl streamsMetrics,
  Time time,
  StreamsMetadataState streamsMetadataState,
  long cacheSizeBytes,
  StateDirectory stateDirectory,
  StateRestoreListener userStateRestoreListener,
  int threadIdx,
  Runnable shutdownErrorHook,
  java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler)

create prints out the following INFO message to the logs:

Creating restore consumer client

create requests the given StreamsConfig for the restore consumer configs (with getRestoreConsumerClientId) and requests the given KafkaClientSupplier for a restore consumer.

create creates a StoreChangelogReader.

create creates a ThreadCache.

create creates a ActiveTaskCreator, a StandbyTaskCreator and a TaskManager.

create prints out the following INFO message to the logs:

Creating consumer client

create...FIXME

create is used when:

Starting Execution

void run()

run...FIXME

run is part of the Thread (Java) abstraction.

runLoop

void runLoop()

runLoop...FIXME

runOnce

void runOnce()

runOnce...FIXME

maybeCommit

int maybeCommit()

maybeCommit checks out whether to commit active and standby tasks (based on the last commit time and commit.interval.ms).

If the last commit happened enough long ago, maybeCommit prints out the following DEBUG message to the logs:

Committing all active tasks [ids] and standby tasks [ids] since [time]ms has elapsed (commit interval is [time]ms)

maybeCommit requests the TaskManager to commit the tasks that are RUNNING or RESTORING.

If there were offsets committed, maybeCommit requests the TaskManager to maybePurgeCommittedRecords. Otherwise, maybeCommit prints out the following DEBUG message to the logs:

Unable to commit as we are in the middle of a rebalance, will try again when it completes.

If the last commit happened fairly recently, maybeCommit merely requests the TaskManager to maybeCommitActiveTasksPerUserRequested

Either way, in the end, maybeCommit returns the number of committed offsets.

Logging

Enable ALL logging level for org.apache.kafka.streams.processor.internals.StreamThread logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.StreamThread=ALL

Refer to Logging.

Back to top