Skip to content

Tasks

Creating Instance

Tasks takes the following to be created:

Tasks is created along with TaskManager.

Main Kafka Consumer

Tasks is given a Kafka Consumer<byte[], byte[]> (using setMainConsumer method) for the following:

The idea is to pass this Consumer on to StreamTasks.

setMainConsumer

void setMainConsumer(
  Consumer<byte[], byte[]> mainConsumer)

setMainConsumer sets the Main Kafka Consumer.

setMainConsumer is used when:

handleNewAssignmentAndCreateTasks

void handleNewAssignmentAndCreateTasks(
  Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
  Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
  Set<TaskId> assignedActiveTasks,
  Set<TaskId> assignedStandbyTasks)

handleNewAssignmentAndCreateTasks requests the ActiveTaskCreator to removeRevokedUnknownTasks (from the assignedActiveTasks).

handleNewAssignmentAndCreateTasks requests the StandbyTaskCreator to removeRevokedUnknownTasks (from the assignedStandbyTasks).

In the end, handleNewAssignmentAndCreateTasks createTasks (with the activeTasksToCreate and standbyTasksToCreate).

handleNewAssignmentAndCreateTasks is used when:

createTasks

void createTasks(
  Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
  Map<TaskId, Set<TopicPartition>> standbyTasksToCreate)

createTasks requests the ActiveTaskCreator to create active tasks (in the given activeTasksToCreate collection). createTasks registers the tasks in the activeTasksPerId and allTasksPerId registries. createTasks registers the inputPartitions of the tasks in the activeTasksPerPartition.

createTasks requests the StandbyTaskCreator to create standby tasks (in the given standbyTasksToCreate collection). createTasks registers the tasks in the standbyTasksPerId and allTasksPerId registries.

createTasks is used when:

maybeCreateTasksFromNewTopologies

void maybeCreateTasksFromNewTopologies()

maybeCreateTasksFromNewTopologies requests the TopologyMetadata for the names of the named topologies.

In the end, maybeCreateTasksFromNewTopologies createTasks with the active and standby tasks (and their assigned partitions from the ActiveTaskCreator and StandbyTaskCreator).

maybeCreateTasksFromNewTopologies is used when:

convertStandbyToActive

void convertStandbyToActive(
  StandbyTask standbyTask,
  Set<TopicPartition> partitions)

convertStandbyToActive...FIXME

convertStandbyToActive is used when:

activeTasksPerPartition

Map<TopicPartition, Task> activeTasksPerPartition

Tasks defines an activeTasksPerPartition registry of Tasks that handle (records of) a TopicPartition.

A new Task can be added in createTasks, convertStandbyToActive, updateInputPartitionsAndResume

One or more Tasks can be removed in convertActiveToStandby, updateInputPartitionsAndResume, removeTaskBeforeClosing and clear.

A Task can be looked up using activeTasksForInputPartition.

activeTasksForInputPartition

Task activeTasksForInputPartition(
  TopicPartition partition)

activeTasksForInputPartition looks up the Task for the given TopicPartition in the activeTasksPerPartition registry.

activeTasksForInputPartition is used when:

Back to top