Skip to content


StreamTask is a concrete AbstractTask.

StreamTask creates exactly one PartitionGroup to handle all the input partitions.

Creating Instance

StreamTask takes the following to be created:

StreamTask is created when:


When created, StreamTask creates a RecordQueueCreator based on the following configuration properties:

StreamTask uses the RecordQueueCreator when:


StreamTask uses buffered.records.per.partition configuration property to control when to pause and resume record consumption (on a partition) when requested to add and process records, respectively.

Main Kafka Consumer

StreamTask is given a Consumer<byte[], byte[]> (Apache Kafka) when created.

StreamTask uses the Consumer for the following:


StreamTask is given a RecordCollector when created.


When created, StreamTask creates a PartitionGroup with the following:


Map<TopicPartition, RecordQueue> createPartitionQueues()

createPartitionQueues requests the RecordQueueCreator to create one RecordQueue per every partition in the input partitions.


void updateInputPartitions(
  Set<TopicPartition> topicPartitions,
  Map<String, List<String>> allTopologyNodesToSourceTopics)

updateInputPartitions updateInputPartitions.

In the end, updateInputPartitions requests the PartitionGroup to updatePartitions (with the given TopicPartitions and createQueue factory).

updateInputPartitions is part of the Task abstraction.


long streamTime()

streamTime requests the PartitionGroup for the streamTime.

streamTime is used when:

Scheduling Recurring Action

Cancellable schedule(
  long startTime,
  long interval,
  PunctuationType type,
  Punctuator punctuator)
Cancellable schedule(
  long interval,
  PunctuationType type,
  Punctuator punctuator)

schedule creates a PunctuationSchedule (for the current ProcessorNode) and requests the stream-time or system-time PunctuationQueues to schedule the PunctuationSchedule based on the given PunctuationType (STREAM_TIME or WALL_CLOCK_TIME, respectively).

schedule is used when:

Stream- and System-Time PunctuationQueues

StreamTask creates two PunctuationQueues when created for the following Punctuators:

  1. streamTimePunctuationQueue queue for STREAM_TIME-type punctuators
  2. systemTimePunctuationQueue queue for WALL_CLOCK_TIME-type punctuators

Punctuators are added to a corresponding PunctuationQueue when StreamTask is requested to schedule a recurring action.

The actions are executed every maybePunctuateStreamTime or maybePunctuateSystemTime (when TaskManager is requested to punctuate).


boolean maybePunctuateStreamTime()

maybePunctuateStreamTime returns true when at least one STREAM_TIME punctuator has been executed.

maybePunctuateStreamTime requests the PartitionGroup for the stream time.


Stream time for STREAM_TIME punctuators is determined using the PartitionGroup.

maybePunctuateStreamTime returns false for the stream time as RecordQueue.UNKNOWN (the stream time is yet to be determined and unknown).

maybePunctuateStreamTime requests the stream-time PunctuationQueue to mayPunctuate (with the stream time, STREAM_TIME punctuation type and this StreamTask). If there was at least one recurring action triggered (punctuated), maybePunctuateStreamTime marks this StreamTask as commitNeeded.

maybePunctuateStreamTime is part of the Task abstraction.


boolean maybePunctuateSystemTime()

maybePunctuateSystemTime returns true when at least one WALL_CLOCK_TIME punctuator has been executed.

maybePunctuateSystemTime requests the Time for the current time (in ms).


System time for WALL_CLOCK_TIME punctuators is determined using the Time.

maybePunctuateSystemTime requests the system-time PunctuationQueue to mayPunctuate (with the current system time, WALL_CLOCK_TIME punctuation type and this StreamTask). If there was at least one recurring action triggered (punctuated), maybePunctuateSystemTime marks this StreamTask as commitNeeded.

maybePunctuateSystemTime is part of the Task abstraction.


Map<TopicPartition, OffsetAndMetadata> prepareCommit()


prepareCommit is part of the Task abstraction.


Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata()


Adding Records

void addRecords(
  TopicPartition partition,
  Iterable<ConsumerRecord<byte[], byte[]>> records)

addRecords requests the PartitionGroup to add the given ConsumerRecords (for the given TopicPartition).

addRecords prints out the following TRACE message to the logs:

Added records into the buffered queue of partition [partition],
new queue size is [newQueueSize]

addRecords can request the main Consumer to pause (suspend) fetching and consuming records from the partition if the queue size for the partition (after adding the new raw records) crossed the buffered.records.per.partition threshold.

addRecords is part of the Task abstraction.


void addPartitionsForOffsetReset(
  Set<TopicPartition> partitionsForOffsetReset)


addPartitionsForOffsetReset is part of the Task abstraction.

Processing One Record

boolean process(
  long wallClockTime)


process is part of the Task abstraction.


void punctuate(
  ProcessorNode<?, ?, ?, ?> node,
  long timestamp,
  PunctuationType type,
  Punctuator punctuator)


punctuate is part of the ProcessorNodePunctuator abstraction.


void completeRestoration(
  java.util.function.Consumer<Set<TopicPartition>> offsetResetter)


completeRestoration is part of the Task abstraction.


void resetOffsetsIfNeededAndInitializeMetadata(
  java.util.function.Consumer<Set<TopicPartition>> offsetResetter)



Enable ALL logging level for org.apache.kafka.streams.processor.internals.StreamTask logger to see what happens inside.

Add the following line to conf/

Refer to Logging.

Back to top