Skip to content

StreamTask

StreamTask is a concrete AbstractTask.

StreamTask creates exactly one PartitionGroup to handle all the input partitions.

Creating Instance

StreamTask takes the following to be created:

StreamTask is created when:

RecordQueueCreator

When created, StreamTask creates a RecordQueueCreator based on the following configuration properties:

StreamTask uses the RecordQueueCreator when:

buffered.records.per.partition

StreamTask uses buffered.records.per.partition configuration property to control when to pause and resume record consumption (on a partition) when requested to add and process records, respectively.

Main Kafka Consumer

StreamTask is given a Consumer<byte[], byte[]> (Apache Kafka) when created.

StreamTask uses the Consumer for the following:

RecordCollector

StreamTask is given a RecordCollector when created.

PartitionGroup

When created, StreamTask creates a PartitionGroup with the following:

createPartitionQueues

Map<TopicPartition, RecordQueue> createPartitionQueues()

createPartitionQueues requests the RecordQueueCreator to create one RecordQueue per every partition in the input partitions.

updateInputPartitions

void updateInputPartitions(
  Set<TopicPartition> topicPartitions,
  Map<String, List<String>> allTopologyNodesToSourceTopics)

updateInputPartitions updateInputPartitions.

In the end, updateInputPartitions requests the PartitionGroup to updatePartitions (with the given TopicPartitions and createQueue factory).

updateInputPartitions is part of the Task abstraction.

streamTime

long streamTime()

streamTime requests the PartitionGroup for the streamTime.

streamTime is used when:

Scheduling Recurring Action

Cancellable schedule(
  long startTime,
  long interval,
  PunctuationType type,
  Punctuator punctuator)
Cancellable schedule(
  long interval,
  PunctuationType type,
  Punctuator punctuator)

schedule creates a PunctuationSchedule (for the current ProcessorNode) and requests the stream-time or system-time PunctuationQueues to schedule the PunctuationSchedule based on the given PunctuationType (STREAM_TIME or WALL_CLOCK_TIME, respectively).

schedule is used when:

Stream- and System-Time PunctuationQueues

StreamTask creates two PunctuationQueues when created for the following Punctuators:

  1. streamTimePunctuationQueue queue for STREAM_TIME-type punctuators
  2. systemTimePunctuationQueue queue for WALL_CLOCK_TIME-type punctuators

Punctuators are added to a corresponding PunctuationQueue when StreamTask is requested to schedule a recurring action.

The actions are executed every maybePunctuateStreamTime or maybePunctuateSystemTime (when TaskManager is requested to punctuate).

maybePunctuateStreamTime

boolean maybePunctuateStreamTime()

maybePunctuateStreamTime returns true when at least one STREAM_TIME punctuator has been executed.


maybePunctuateStreamTime requests the PartitionGroup for the stream time.

Note

Stream time for STREAM_TIME punctuators is determined using the PartitionGroup.

maybePunctuateStreamTime returns false for the stream time as RecordQueue.UNKNOWN (the stream time is yet to be determined and unknown).

maybePunctuateStreamTime requests the stream-time PunctuationQueue to mayPunctuate (with the stream time, STREAM_TIME punctuation type and this StreamTask). If there was at least one recurring action triggered (punctuated), maybePunctuateStreamTime marks this StreamTask as commitNeeded.

maybePunctuateStreamTime is part of the Task abstraction.

maybePunctuateSystemTime

boolean maybePunctuateSystemTime()

maybePunctuateSystemTime returns true when at least one WALL_CLOCK_TIME punctuator has been executed.


maybePunctuateSystemTime requests the Time for the current time (in ms).

Note

System time for WALL_CLOCK_TIME punctuators is determined using the Time.

maybePunctuateSystemTime requests the system-time PunctuationQueue to mayPunctuate (with the current system time, WALL_CLOCK_TIME punctuation type and this StreamTask). If there was at least one recurring action triggered (punctuated), maybePunctuateSystemTime marks this StreamTask as commitNeeded.

maybePunctuateSystemTime is part of the Task abstraction.

prepareCommit

Map<TopicPartition, OffsetAndMetadata> prepareCommit()

prepareCommit...FIXME

prepareCommit is part of the Task abstraction.

committableOffsetsAndMetadata

Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata()

committableOffsetsAndMetadata...FIXME

Adding Records

void addRecords(
  TopicPartition partition,
  Iterable<ConsumerRecord<byte[], byte[]>> records)

addRecords requests the PartitionGroup to add the given ConsumerRecords (for the given TopicPartition).

addRecords prints out the following TRACE message to the logs:

Added records into the buffered queue of partition [partition],
new queue size is [newQueueSize]

addRecords can request the main Consumer to pause (suspend) fetching and consuming records from the partition if the queue size for the partition (after adding the new raw records) crossed the buffered.records.per.partition threshold.

addRecords is part of the Task abstraction.

addPartitionsForOffsetReset

void addPartitionsForOffsetReset(
  Set<TopicPartition> partitionsForOffsetReset)

addPartitionsForOffsetReset...FIXME

addPartitionsForOffsetReset is part of the Task abstraction.

Processing One Record

boolean process(
  long wallClockTime)

process...FIXME

process is part of the Task abstraction.

punctuate

void punctuate(
  ProcessorNode<?, ?, ?, ?> node,
  long timestamp,
  PunctuationType type,
  Punctuator punctuator)

punctuate...FIXME

punctuate is part of the ProcessorNodePunctuator abstraction.

completeRestoration

void completeRestoration(
  java.util.function.Consumer<Set<TopicPartition>> offsetResetter)

completeRestoration...FIXME

completeRestoration is part of the Task abstraction.

resetOffsetsIfNeededAndInitializeMetadata

void resetOffsetsIfNeededAndInitializeMetadata(
  java.util.function.Consumer<Set<TopicPartition>> offsetResetter)

resetOffsetsIfNeededAndInitializeMetadata...FIXME

Logging

Enable ALL logging level for org.apache.kafka.streams.processor.internals.StreamTask logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.StreamTask=ALL

Refer to Logging.

Back to top