TaskManager¶
Creating Instance¶
TaskManager
takes the following to be created:
-
Time
- ChangelogReader
- Process UUID
- Log Prefix
- StreamsMetricsImpl
- ActiveTaskCreator
- StandbyTaskCreator
- InternalTopologyBuilder
-
Admin
Client (Apache Kafka) - StateDirectory
-
StreamThread.ProcessingMode
TaskManager
is created when:
StreamThread
utility is used to create a StreamThread
Tasks¶
TaskManager
creates a Tasks when created.
ActiveTaskCreator¶
TaskManager
is given an ActiveTaskCreator when created.
TaskManager
uses this ActiveTaskCreator
(along with StandbyTaskCreator) merely to create a Tasks.
Punctuating Recurring Actions¶
int punctuate()
punctuate
requests every active task to maybePunctuateStreamTime and maybePunctuateSystemTime (counting punctuators that were executed).
punctuate
is used when:
StreamThread
is requested to runOnce
Committing (Active) Tasks¶
int commit(
Collection<Task> tasksToCommit)
commit
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap the given tasks.
In the end, commit
returns consumed offsets and metadata per every committed task (Map<Task, Map<TopicPartition, OffsetAndMetadata>>
).
commit
is used when:
StreamThread
is requested to maybeCommitTaskManager
is requested to maybeCommitActiveTasksPerUserRequested
Handling Active and Standby Task Assignment¶
void handleAssignment(
Map<TaskId, Set<TopicPartition>> activeTasks,
Map<TaskId, Set<TopicPartition>> standbyTasks)
handleAssignment
prints out the following INFO message to the logs:
Handle new assignment with:
New active tasks: [activeTasks]
New standby tasks: [standbyTasks]
Existing active tasks: [activeTaskIds]
Existing standby tasks: [standbyTaskIds]
handleAssignment
requests the TopologyMetadata to addSubscribedTopicsFromAssignment (with the TopicPartition
s from the given activeTasks
).
handleAssignment
rectifies all the existing tasks.
handleAssignment
determines which existing tasks to close (and remove) or recycle and handleCloseAndRecycle them.
In the end, handleAssignment
requests the Tasks to handle the new assignment and create active and standby tasks.
handleAssignment
is used when:
StreamsPartitionAssignor
is requested to onAssignment
handleCloseAndRecycle¶
void handleCloseAndRecycle(
Set<Task> tasksToRecycle,
Set<Task> tasksToCloseClean,
Set<Task> tasksToCloseDirty,
Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions)
handleCloseAndRecycle
...FIXME
Handling TaskCorruptedException¶
void handleCorruption(
Set<TaskId> corruptedTasks)
handleCorruption
...FIXME
handleCorruption
is used when:
StreamThread
is requested to runLoop (and caught aTaskCorruptedException
)
maybeCommitActiveTasksPerUserRequested¶
int maybeCommitActiveTasksPerUserRequested()
With rebalance in progress, maybeCommitActiveTasksPerUserRequested
returns -1
immediately.
Otherwise, maybeCommitActiveTasksPerUserRequested
finds tasks (among active tasks) with commitRequested or commitNeeded and, if there is at least one, commits them.
maybeCommitActiveTasksPerUserRequested
is used when:
StreamThread
is requested to maybeCommit
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap¶
int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(
Collection<Task> tasksToCommit,
Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask)
With rebalance in progress, commitAndFillInConsumedOffsetsAndMetadataPerTaskMap
returns -1
immediately.
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap
requests every Task with commitNeeded (in the given tasksToCommit
tasks) to prepareCommit (that gives offsets and metadata per partition). commitAndFillInConsumedOffsetsAndMetadataPerTaskMap
saves the offsets and metadata per partition for a task (that is active) in the given consumedOffsetsAndMetadataPerTask
.
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap
commitOffsetsOrTransaction (with the given consumedOffsetsAndMetadataPerTask
that may have been updated with some active tasks as described above).
Once again, commitAndFillInConsumedOffsetsAndMetadataPerTaskMap
requests every Task with commitNeeded (in the given tasksToCommit
tasks) to clearTaskTimeout and postCommit (with enforceCheckpoint
flag disabled).
In the end, commitAndFillInConsumedOffsetsAndMetadataPerTaskMap
returns the number of tasks committed.
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap
is used when:
TaskManager
is requested to commit and handle a TaskCorruptedException
Partition Rebalancing¶
rebalanceInProgress Flag¶
TaskManager
uses rebalanceInProgress
internal flag to indicate that it is in the middle of partition rebalancing (which is considered not safe to commit and used to skip commitAndFillInConsumedOffsetsAndMetadataPerTaskMap and maybeCommitActiveTasksPerUserRequested).
The rebalanceInProgress
flag is disabled (false
) initially. It is turned on (true
) in handleRebalanceStart and off in handleRebalanceComplete.
isRebalanceInProgress¶
boolean isRebalanceInProgress()
isRebalanceInProgress
returns the value of the internal rebalanceInProgress flag.
isRebalanceInProgress
is used when:
StreamThread
is requested to run
handleRebalanceStart¶
void handleRebalanceStart(
Set<String> subscribedTopics)
handleRebalanceStart
requests the InternalTopologyBuilder to addSubscribedTopicsFromMetadata with the given subscribedTopics
.
handleRebalanceStart
tryToLockAllNonEmptyTaskDirectories and turns the rebalanceInProgress internal flag on (true
).
handleRebalanceStart
is used when:
StreamsPartitionAssignor
is requested to handleRebalanceStart
handleRebalanceComplete¶
void handleRebalanceComplete()
handleRebalanceComplete
requests the Consumer to pause (suspend) fetching from the partitions that are assigned to this consumer.
handleRebalanceComplete
releaseLockedUnassignedTaskDirectories and turns the rebalanceInProgress internal flag off (false
).
handleRebalanceComplete
is used when:
StreamsPartitionAssignor
is requested to onPartitionsAssigned
Processing Records (with Active Stream Tasks)¶
int process(
int maxNumRecords,
Time time)
process
requests every active StreamTask to process a record until the number of records processed (across all the active tasks) reaches the given maxNumRecords
threshold or there are no more records to process.
process
is used when:
StreamThread
is requested to runOnce (every iteration)
tryToCompleteRestoration¶
boolean tryToCompleteRestoration(
long now,
java.util.function.Consumer<Set<TopicPartition>> offsetResetter)
tryToCompleteRestoration
...FIXME
tryToCompleteRestoration
is used when:
StreamThread
is requested to initializeAndRestorePhase
Adding Records to Active StreamTasks¶
void addRecordsToTasks(
ConsumerRecords<byte[], byte[]> records)
For every partition (in the given records
registry of ConsumerRecord
s), addRecordsToTasks
finds the active StreamTask that handles records of this partition and passes the records (for the partition).
Note
A single active Task is responsible for a single TopicPartition
.
addRecordsToTasks
is used when:
StreamThread
is requested to poll for records
needsInitializationOrRestoration¶
boolean needsInitializationOrRestoration()
needsInitializationOrRestoration
is true
if any of the Tasks require initialization or restoration.
needsInitializationOrRestoration
is used when:
StreamThread
is requested to initializeAndRestorePhase
Logging¶
Enable ALL
logging level for org.apache.kafka.streams.processor.internals.TaskManager
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.org.apache.kafka.streams.processor.internals.TaskManager=ALL
Refer to Logging.