Skip to content

TaskManager

Creating Instance

TaskManager takes the following to be created:

  • Time
  • ChangelogReader
  • Process UUID
  • Log Prefix
  • StreamsMetricsImpl
  • ActiveTaskCreator
  • StandbyTaskCreator
  • InternalTopologyBuilder
  • Admin
  • StateDirectory
  • StreamThread.ProcessingMode

TaskManager is created when:

Committing (Active) Tasks

int commit(
  Collection<Task> tasksToCommit)

commit commitAndFillInConsumedOffsetsAndMetadataPerTaskMap the given tasks.

In the end, commit returns consumed offsets and metadata per every committed task (Map<Task, Map<TopicPartition, OffsetAndMetadata>>).

commit is used when:

handleAssignment

void handleAssignment(
  Map<TaskId, Set<TopicPartition>> activeTasks,
  Map<TaskId, Set<TopicPartition>> standbyTasks)

handleAssignment...FIXME

handleAssignment is used when:

handleCloseAndRecycle

void handleCloseAndRecycle(
  Set<Task> tasksToRecycle,
  Set<Task> tasksToCloseClean,
  Set<Task> tasksToCloseDirty,
  Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
  Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
  LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions)

handleCloseAndRecycle...FIXME

Handling TaskCorruptedException

void handleCorruption(
  Set<TaskId> corruptedTasks)

handleCorruption...FIXME

handleCorruption is used when:

  • StreamThread is requested to runLoop (and caught a TaskCorruptedException)

maybeCommitActiveTasksPerUserRequested

int maybeCommitActiveTasksPerUserRequested()

With rebalance in progress, maybeCommitActiveTasksPerUserRequested returns -1 immediately.

Otherwise, maybeCommitActiveTasksPerUserRequested finds tasks (among active tasks) with commitRequested or commitNeeded and, if there is at least one, commits them.

maybeCommitActiveTasksPerUserRequested is used when:

commitAndFillInConsumedOffsetsAndMetadataPerTaskMap

int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(
  Collection<Task> tasksToCommit,
  Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask)

With rebalance in progress, commitAndFillInConsumedOffsetsAndMetadataPerTaskMap returns -1 immediately.

commitAndFillInConsumedOffsetsAndMetadataPerTaskMap requests every Task with commitNeeded (in the given tasksToCommit tasks) to prepareCommit (that gives offsets and metadata per partition). commitAndFillInConsumedOffsetsAndMetadataPerTaskMap saves the offsets and metadata per partition for a task (that is active) in the given consumedOffsetsAndMetadataPerTask.

commitAndFillInConsumedOffsetsAndMetadataPerTaskMap commitOffsetsOrTransaction (with the given consumedOffsetsAndMetadataPerTask that may have been updated with some active tasks as described above).

Once again, commitAndFillInConsumedOffsetsAndMetadataPerTaskMap requests every Task with commitNeeded (in the given tasksToCommit tasks) to clearTaskTimeout and postCommit (with enforceCheckpoint flag disabled).

In the end, commitAndFillInConsumedOffsetsAndMetadataPerTaskMap returns the number of tasks committed.

commitAndFillInConsumedOffsetsAndMetadataPerTaskMap is used when:

Partition Rebalancing

rebalanceInProgress Flag

TaskManager uses rebalanceInProgress internal flag to indicate that it is in the middle of partition rebalancing (which is considered not safe to commit and used to skip commitAndFillInConsumedOffsetsAndMetadataPerTaskMap and maybeCommitActiveTasksPerUserRequested).

The rebalanceInProgress flag is disabled (false) initially. It is turned on (true) in handleRebalanceStart and off in handleRebalanceComplete.

isRebalanceInProgress

boolean isRebalanceInProgress()

isRebalanceInProgress returns the value of the internal rebalanceInProgress flag.

isRebalanceInProgress is used when:

  • StreamThread is requested to run

handleRebalanceStart

void handleRebalanceStart(
  Set<String> subscribedTopics)

handleRebalanceStart requests the InternalTopologyBuilder to addSubscribedTopicsFromMetadata with the given subscribedTopics.

handleRebalanceStart tryToLockAllNonEmptyTaskDirectories and turns the rebalanceInProgress internal flag on (true).

handleRebalanceStart is used when:

handleRebalanceComplete

void handleRebalanceComplete()

handleRebalanceComplete requests the Consumer to pause (suspend) fetching from the partitions that are assigned to this consumer.

handleRebalanceComplete releaseLockedUnassignedTaskDirectories and turns the rebalanceInProgress internal flag off (false).

handleRebalanceComplete is used when:

Back to top