ActiveTaskCreator¶
ActiveTaskCreator
is used by TaskManager.
Creating Instance¶
ActiveTaskCreator
takes the following to be created:
- InternalTopologyBuilder
- StreamsConfig
- StreamsMetricsImpl
- StateDirectory
- ChangelogReader
- ThreadCache
-
Time
- KafkaClientSupplier
- Thread ID
- Process ID
-
Logger
ActiveTaskCreator
is created when:
StreamThread
utility is used to create a StreamThread (for a TaskManager)
StreamsProducer¶
ActiveTaskCreator
creates a StreamsProducer when created.
createTasks¶
Collection<Task> createTasks(
Consumer<byte[], byte[]> consumer,
Map<TaskId, Set<TopicPartition>> tasksToBeCreated)
For every TaskId and TopicPartition
s pair (in the given tasksToBeCreated
collection), createTasks
requests the InternalTopologyBuilder to buildSubtopology. createTasks
createActiveTask (with the ProcessorTopology, a new ProcessorStateManager and ProcessorContextImpl).
In the end, createTasks
returns the newly-created StreamTasks.
createTasks
is used when:
Tasks
is requested to createTasks
createActiveTaskFromStandby¶
StreamTask createActiveTaskFromStandby(
StandbyTask standbyTask,
Set<TopicPartition> inputPartitions,
Consumer<byte[], byte[]> consumer)
createActiveTaskFromStandby
...FIXME
createActiveTaskFromStandby
is used when:
Tasks
is requested to convertStandbyToActive
Creating Active StreamTask¶
StreamTask createActiveTask(
TaskId taskId,
Set<TopicPartition> inputPartitions,
Consumer<byte[], byte[]> consumer,
LogContext logContext,
ProcessorTopology topology,
ProcessorStateManager stateManager,
InternalProcessorContext context)
createActiveTask
determines whether to create a new StreamsProducer or use the existing one based on ProcessingMode (ProcessingMode.EXACTLY_ONCE_ALPHA
or not, respectively).
createActiveTask
creates a RecordCollectorImpl.
createActiveTask
creates a StreamTask.
createActiveTask
prints out the following TRACE message to the logs:
Created task [taskId] with assigned partitions [inputPartitions]
In the end, createActiveTask
requests the createTaskSensor to record this occurrence.
createActiveTask
is used when:
ActiveTaskCreator
is requested to createTasks and createActiveTaskFromStandby
Logging¶
Enable ALL
logging level for org.apache.kafka.streams.processor.internals.ActiveTaskCreator
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.org.apache.kafka.streams.processor.internals.ActiveTaskCreator=ALL
Refer to Logging.