Skip to content

TaskManager

Creating Instance

TaskManager takes the following to be created:

TaskManager is created when:

Tasks

TaskManager creates a Tasks when created.

ActiveTaskCreator

TaskManager is given an ActiveTaskCreator when created.

TaskManager uses this ActiveTaskCreator (along with StandbyTaskCreator) merely to create a Tasks.

Punctuating Recurring Actions

int punctuate()

punctuate requests every active task to maybePunctuateStreamTime and maybePunctuateSystemTime (counting punctuators that were executed).

punctuate is used when:

  • StreamThread is requested to runOnce

Committing (Active) Tasks

int commit(
  Collection<Task> tasksToCommit)

commit commitAndFillInConsumedOffsetsAndMetadataPerTaskMap the given tasks.

In the end, commit returns consumed offsets and metadata per every committed task (Map<Task, Map<TopicPartition, OffsetAndMetadata>>).

commit is used when:

Handling Active and Standby Task Assignment

void handleAssignment(
  Map<TaskId, Set<TopicPartition>> activeTasks,
  Map<TaskId, Set<TopicPartition>> standbyTasks)

handleAssignment prints out the following INFO message to the logs:

Handle new assignment with:
  New active tasks: [activeTasks]
  New standby tasks: [standbyTasks]
  Existing active tasks: [activeTaskIds]
  Existing standby tasks: [standbyTaskIds]

handleAssignment requests the TopologyMetadata to addSubscribedTopicsFromAssignment (with the TopicPartitions from the given activeTasks).

handleAssignment rectifies all the existing tasks.

handleAssignment determines which existing tasks to close (and remove) or recycle and handleCloseAndRecycle them.

In the end, handleAssignment requests the Tasks to handle the new assignment and create active and standby tasks.

handleAssignment is used when:

handleCloseAndRecycle

void handleCloseAndRecycle(
  Set<Task> tasksToRecycle,
  Set<Task> tasksToCloseClean,
  Set<Task> tasksToCloseDirty,
  Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
  Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
  LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions)

handleCloseAndRecycle...FIXME

Handling TaskCorruptedException

void handleCorruption(
  Set<TaskId> corruptedTasks)

handleCorruption...FIXME

handleCorruption is used when:

  • StreamThread is requested to runLoop (and caught a TaskCorruptedException)

maybeCommitActiveTasksPerUserRequested

int maybeCommitActiveTasksPerUserRequested()

With rebalance in progress, maybeCommitActiveTasksPerUserRequested returns -1 immediately.

Otherwise, maybeCommitActiveTasksPerUserRequested finds tasks (among active tasks) with commitRequested or commitNeeded and, if there is at least one, commits them.

maybeCommitActiveTasksPerUserRequested is used when:

commitAndFillInConsumedOffsetsAndMetadataPerTaskMap

int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(
  Collection<Task> tasksToCommit,
  Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask)

With rebalance in progress, commitAndFillInConsumedOffsetsAndMetadataPerTaskMap returns -1 immediately.

commitAndFillInConsumedOffsetsAndMetadataPerTaskMap requests every Task with commitNeeded (in the given tasksToCommit tasks) to prepareCommit (that gives offsets and metadata per partition). commitAndFillInConsumedOffsetsAndMetadataPerTaskMap saves the offsets and metadata per partition for a task (that is active) in the given consumedOffsetsAndMetadataPerTask.

commitAndFillInConsumedOffsetsAndMetadataPerTaskMap commitOffsetsOrTransaction (with the given consumedOffsetsAndMetadataPerTask that may have been updated with some active tasks as described above).

Once again, commitAndFillInConsumedOffsetsAndMetadataPerTaskMap requests every Task with commitNeeded (in the given tasksToCommit tasks) to clearTaskTimeout and postCommit (with enforceCheckpoint flag disabled).

In the end, commitAndFillInConsumedOffsetsAndMetadataPerTaskMap returns the number of tasks committed.

commitAndFillInConsumedOffsetsAndMetadataPerTaskMap is used when:

Partition Rebalancing

rebalanceInProgress Flag

TaskManager uses rebalanceInProgress internal flag to indicate that it is in the middle of partition rebalancing (which is considered not safe to commit and used to skip commitAndFillInConsumedOffsetsAndMetadataPerTaskMap and maybeCommitActiveTasksPerUserRequested).

The rebalanceInProgress flag is disabled (false) initially. It is turned on (true) in handleRebalanceStart and off in handleRebalanceComplete.

isRebalanceInProgress

boolean isRebalanceInProgress()

isRebalanceInProgress returns the value of the internal rebalanceInProgress flag.

isRebalanceInProgress is used when:

  • StreamThread is requested to run

handleRebalanceStart

void handleRebalanceStart(
  Set<String> subscribedTopics)

handleRebalanceStart requests the InternalTopologyBuilder to addSubscribedTopicsFromMetadata with the given subscribedTopics.

handleRebalanceStart tryToLockAllNonEmptyTaskDirectories and turns the rebalanceInProgress internal flag on (true).

handleRebalanceStart is used when:

handleRebalanceComplete

void handleRebalanceComplete()

handleRebalanceComplete requests the Consumer to pause (suspend) fetching from the partitions that are assigned to this consumer.

handleRebalanceComplete releaseLockedUnassignedTaskDirectories and turns the rebalanceInProgress internal flag off (false).

handleRebalanceComplete is used when:

Processing Records (with Active Stream Tasks)

int process(
  int maxNumRecords,
  Time time)

process requests every active StreamTask to process a record until the number of records processed (across all the active tasks) reaches the given maxNumRecords threshold or there are no more records to process.

process is used when:

tryToCompleteRestoration

boolean tryToCompleteRestoration(
  long now,
  java.util.function.Consumer<Set<TopicPartition>> offsetResetter)

tryToCompleteRestoration...FIXME

tryToCompleteRestoration is used when:

Adding Records to Active StreamTasks

void addRecordsToTasks(
  ConsumerRecords<byte[], byte[]> records)

For every partition (in the given records registry of ConsumerRecords), addRecordsToTasks finds the active StreamTask that handles records of this partition and passes the records (for the partition).

Note

A single active Task is responsible for a single TopicPartition.

addRecordsToTasks is used when:

needsInitializationOrRestoration

boolean needsInitializationOrRestoration()

needsInitializationOrRestoration is true if any of the Tasks require initialization or restoration.

needsInitializationOrRestoration is used when:

Logging

Enable ALL logging level for org.apache.kafka.streams.processor.internals.TaskManager logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.TaskManager=ALL

Refer to Logging.

Back to top