Tasks¶
Creating Instance¶
Tasks
takes the following to be created:
Tasks
is created along with TaskManager.
Main Kafka Consumer¶
Tasks
is given a Kafka Consumer<byte[], byte[]>
(using setMainConsumer method) for the following:
The idea is to pass this Consumer
on to StreamTasks.
setMainConsumer¶
void setMainConsumer(
Consumer<byte[], byte[]> mainConsumer)
setMainConsumer
sets the Main Kafka Consumer.
setMainConsumer
is used when:
TaskManager
is requested to setMainConsumer (whenStreamThread
utility is used to create a StreamThread)
handleNewAssignmentAndCreateTasks¶
void handleNewAssignmentAndCreateTasks(
Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
Set<TaskId> assignedActiveTasks,
Set<TaskId> assignedStandbyTasks)
handleNewAssignmentAndCreateTasks
requests the ActiveTaskCreator to removeRevokedUnknownTasks (from the assignedActiveTasks
).
handleNewAssignmentAndCreateTasks
requests the StandbyTaskCreator to removeRevokedUnknownTasks (from the assignedStandbyTasks
).
In the end, handleNewAssignmentAndCreateTasks
createTasks (with the activeTasksToCreate
and standbyTasksToCreate
).
handleNewAssignmentAndCreateTasks
is used when:
TaskManager
is requested to handle active and standby task assignment
createTasks¶
void createTasks(
Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
Map<TaskId, Set<TopicPartition>> standbyTasksToCreate)
createTasks
requests the ActiveTaskCreator to create active tasks (in the given activeTasksToCreate
collection). createTasks
registers the tasks in the activeTasksPerId and allTasksPerId registries. createTasks
registers the inputPartitions of the tasks in the activeTasksPerPartition.
createTasks
requests the StandbyTaskCreator to create standby tasks (in the given standbyTasksToCreate
collection). createTasks
registers the tasks in the standbyTasksPerId and allTasksPerId registries.
createTasks
is used when:
Tasks
is requested to handleNewAssignmentAndCreateTasks and maybeCreateTasksFromNewTopologies
maybeCreateTasksFromNewTopologies¶
void maybeCreateTasksFromNewTopologies()
maybeCreateTasksFromNewTopologies
requests the TopologyMetadata for the names of the named topologies.
In the end, maybeCreateTasksFromNewTopologies
createTasks with the active and standby tasks (and their assigned partitions from the ActiveTaskCreator and StandbyTaskCreator).
maybeCreateTasksFromNewTopologies
is used when:
TaskManager
is requested to handleTopologyUpdates
convertStandbyToActive¶
void convertStandbyToActive(
StandbyTask standbyTask,
Set<TopicPartition> partitions)
convertStandbyToActive
...FIXME
convertStandbyToActive
is used when:
TaskManager
is requested to handleCloseAndRecycle
activeTasksPerPartition¶
Map<TopicPartition, Task> activeTasksPerPartition
Tasks
defines an activeTasksPerPartition
registry of Tasks that handle (records of) a TopicPartition
.
A new Task
can be added in createTasks, convertStandbyToActive, updateInputPartitionsAndResume
One or more Task
s can be removed in convertActiveToStandby, updateInputPartitionsAndResume, removeTaskBeforeClosing and clear.
A Task
can be looked up using activeTasksForInputPartition.
activeTasksForInputPartition¶
Task activeTasksForInputPartition(
TopicPartition partition)
activeTasksForInputPartition
looks up the Task for the given TopicPartition
in the activeTasksPerPartition registry.
activeTasksForInputPartition
is used when:
TaskManager
is requested to add records to active tasks