Creating Instance

TaskManager takes the following to be created:

TaskManager is created when:


TaskManager creates a Tasks when created.


TaskManager is given an ActiveTaskCreator when created.

TaskManager uses this ActiveTaskCreator (along with StandbyTaskCreator) merely to create a Tasks.

Punctuating Recurring Actions

int punctuate()

punctuate requests every active task to maybePunctuateStreamTime and maybePunctuateSystemTime (counting punctuators that were executed).

punctuate is used when:

  • StreamThread is requested to runOnce

Committing (Active) Tasks

int commit(
  Collection<Task> tasksToCommit)

commit commitAndFillInConsumedOffsetsAndMetadataPerTaskMap the given tasks.

In the end, commit returns consumed offsets and metadata per every committed task (Map<Task, Map<TopicPartition, OffsetAndMetadata>>).

commit is used when:

Handling Active and Standby Task Assignment

void handleAssignment(
  Map<TaskId, Set<TopicPartition>> activeTasks,
  Map<TaskId, Set<TopicPartition>> standbyTasks)

handleAssignment prints out the following INFO message to the logs:

Handle new assignment with:
  New active tasks: [activeTasks]
  New standby tasks: [standbyTasks]
  Existing active tasks: [activeTaskIds]
  Existing standby tasks: [standbyTaskIds]

handleAssignment requests the TopologyMetadata to addSubscribedTopicsFromAssignment (with the TopicPartitions from the given activeTasks).

handleAssignment rectifies all the existing tasks.

handleAssignment determines which existing tasks to close (and remove) or recycle and handleCloseAndRecycle them.

In the end, handleAssignment requests the Tasks to handle the new assignment and create active and standby tasks.

handleAssignment is used when:


void handleCloseAndRecycle(
  Set<Task> tasksToRecycle,
  Set<Task> tasksToCloseClean,
  Set<Task> tasksToCloseDirty,
  Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
  Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
  LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions)


Handling TaskCorruptedException

void handleCorruption(
  Set<TaskId> corruptedTasks)


handleCorruption is used when:

  • StreamThread is requested to runLoop (and caught a TaskCorruptedException)


int maybeCommitActiveTasksPerUserRequested()

With rebalance in progress, maybeCommitActiveTasksPerUserRequested returns -1 immediately.

Otherwise, maybeCommitActiveTasksPerUserRequested finds tasks (among active tasks) with commitRequested or commitNeeded and, if there is at least one, commits them.

maybeCommitActiveTasksPerUserRequested is used when:


int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(
  Collection<Task> tasksToCommit,
  Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask)

With rebalance in progress, commitAndFillInConsumedOffsetsAndMetadataPerTaskMap returns -1 immediately.

commitAndFillInConsumedOffsetsAndMetadataPerTaskMap requests every Task with commitNeeded (in the given tasksToCommit tasks) to prepareCommit (that gives offsets and metadata per partition). commitAndFillInConsumedOffsetsAndMetadataPerTaskMap saves the offsets and metadata per partition for a task (that is active) in the given consumedOffsetsAndMetadataPerTask.

commitAndFillInConsumedOffsetsAndMetadataPerTaskMap commitOffsetsOrTransaction (with the given consumedOffsetsAndMetadataPerTask that may have been updated with some active tasks as described above).

Once again, commitAndFillInConsumedOffsetsAndMetadataPerTaskMap requests every Task with commitNeeded (in the given tasksToCommit tasks) to clearTaskTimeout and postCommit (with enforceCheckpoint flag disabled).

In the end, commitAndFillInConsumedOffsetsAndMetadataPerTaskMap returns the number of tasks committed.

commitAndFillInConsumedOffsetsAndMetadataPerTaskMap is used when:

Partition Rebalancing

rebalanceInProgress Flag

TaskManager uses rebalanceInProgress internal flag to indicate that it is in the middle of partition rebalancing (which is considered not safe to commit and used to skip commitAndFillInConsumedOffsetsAndMetadataPerTaskMap and maybeCommitActiveTasksPerUserRequested).

The rebalanceInProgress flag is disabled (false) initially. It is turned on (true) in handleRebalanceStart and off in handleRebalanceComplete.


boolean isRebalanceInProgress()

isRebalanceInProgress returns the value of the internal rebalanceInProgress flag.

isRebalanceInProgress is used when:

  • StreamThread is requested to run


void handleRebalanceStart(
  Set<String> subscribedTopics)

handleRebalanceStart requests the InternalTopologyBuilder to addSubscribedTopicsFromMetadata with the given subscribedTopics.

handleRebalanceStart tryToLockAllNonEmptyTaskDirectories and turns the rebalanceInProgress internal flag on (true).

handleRebalanceStart is used when:


void handleRebalanceComplete()

handleRebalanceComplete requests the Consumer to pause (suspend) fetching from the partitions that are assigned to this consumer.

handleRebalanceComplete releaseLockedUnassignedTaskDirectories and turns the rebalanceInProgress internal flag off (false).

handleRebalanceComplete is used when:

Processing Records (with Active Stream Tasks)

int process(
  int maxNumRecords,
  Time time)

process requests every active StreamTask to process a record until the number of records processed (across all the active tasks) reaches the given maxNumRecords threshold or there are no more records to process.

process is used when:


boolean tryToCompleteRestoration(
  long now,
  java.util.function.Consumer<Set<TopicPartition>> offsetResetter)


tryToCompleteRestoration is used when:

Adding Records to Active StreamTasks

void addRecordsToTasks(
  ConsumerRecords<byte[], byte[]> records)

For every partition (in the given records registry of ConsumerRecords), addRecordsToTasks finds the active StreamTask that handles records of this partition and passes the records (for the partition).


A single active Task is responsible for a single TopicPartition.

addRecordsToTasks is used when:


boolean needsInitializationOrRestoration()

needsInitializationOrRestoration is true if any of the Tasks require initialization or restoration.

needsInitializationOrRestoration is used when:


Enable ALL logging level for org.apache.kafka.streams.processor.internals.TaskManager logger to see what happens inside.

Add the following line to

Refer to Logging.

