Skip to content

ActiveTaskCreator

ActiveTaskCreator is used by TaskManager.

Creating Instance

ActiveTaskCreator takes the following to be created:

ActiveTaskCreator is created when:

StreamsProducer

ActiveTaskCreator creates a StreamsProducer when created.

createTasks

Collection<Task> createTasks(
  Consumer<byte[], byte[]> consumer,
  Map<TaskId, Set<TopicPartition>> tasksToBeCreated)

For every TaskId and TopicPartitions pair (in the given tasksToBeCreated collection), createTasks requests the InternalTopologyBuilder to buildSubtopology. createTasks createActiveTask (with the ProcessorTopology, a new ProcessorStateManager and ProcessorContextImpl).

In the end, createTasks returns the newly-created StreamTasks.

createTasks is used when:

createActiveTaskFromStandby

StreamTask createActiveTaskFromStandby(
  StandbyTask standbyTask,
  Set<TopicPartition> inputPartitions,
  Consumer<byte[], byte[]> consumer)

createActiveTaskFromStandby...FIXME

createActiveTaskFromStandby is used when:

Creating Active StreamTask

StreamTask createActiveTask(
  TaskId taskId,
  Set<TopicPartition> inputPartitions,
  Consumer<byte[], byte[]> consumer,
  LogContext logContext,
  ProcessorTopology topology,
  ProcessorStateManager stateManager,
  InternalProcessorContext context)

createActiveTask determines whether to create a new StreamsProducer or use the existing one based on ProcessingMode (ProcessingMode.EXACTLY_ONCE_ALPHA or not, respectively).

createActiveTask creates a RecordCollectorImpl.

createActiveTask creates a StreamTask.

createActiveTask prints out the following TRACE message to the logs:

Created task [taskId] with assigned partitions [inputPartitions]

In the end, createActiveTask requests the createTaskSensor to record this occurrence.

createActiveTask is used when:

Logging

Enable ALL logging level for org.apache.kafka.streams.processor.internals.ActiveTaskCreator logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.ActiveTaskCreator=ALL

Refer to Logging.

Back to top