StreamTask¶
StreamTask is a concrete AbstractTask.
StreamTask creates exactly one PartitionGroup to handle all the input partitions.
Creating Instance¶
StreamTask takes the following to be created:
- TaskId
- Input
TopicPartitions - ProcessorTopology
- Kafka Consumer
- StreamsConfig
- StreamsMetricsImpl
- StateDirectory
- ThreadCache
-
Time - ProcessorStateManager
- RecordCollector
- InternalProcessorContext
-
LogContext
StreamTask is created when:
ActiveTaskCreatoris requested to create an active taskTopologyTestDriveris requested to setup a task
RecordQueueCreator¶
When created, StreamTask creates a RecordQueueCreator based on the following configuration properties:
StreamTask uses the RecordQueueCreator when:
buffered.records.per.partition¶
StreamTask uses buffered.records.per.partition configuration property to control when to pause and resume record consumption (on a partition) when requested to add and process records, respectively.
Main Kafka Consumer¶
StreamTask is given a Consumer<byte[], byte[]> (Apache Kafka) when created.
StreamTask uses the Consumer for the following:
- Adding Records
- addPartitionsForOffsetReset
- prepareCommit (to committableOffsetsAndMetadata)
- Processing One Record
- completeRestoration (to resetOffsetsIfNeededAndInitializeMetadata)
RecordCollector¶
StreamTask is given a RecordCollector when created.
PartitionGroup¶
When created, StreamTask creates a PartitionGroup with the following:
- Partition Queues
currentLagfunction (Apache Kafka) of the main Consumer- recordLatenessSensor
- enforcedProcessingSensor
- max.task.idle.ms configuration property
createPartitionQueues¶
Map<TopicPartition, RecordQueue> createPartitionQueues()
createPartitionQueues requests the RecordQueueCreator to create one RecordQueue per every partition in the input partitions.
updateInputPartitions¶
void updateInputPartitions(
Set<TopicPartition> topicPartitions,
Map<String, List<String>> allTopologyNodesToSourceTopics)
updateInputPartitions updateInputPartitions.
In the end, updateInputPartitions requests the PartitionGroup to updatePartitions (with the given TopicPartitions and createQueue factory).
updateInputPartitions is part of the Task abstraction.
streamTime¶
long streamTime()
streamTime requests the PartitionGroup for the streamTime.
streamTime is used when:
ProcessorContextImplis requested for the current stream time
Scheduling Recurring Action¶
Cancellable schedule(
long startTime,
long interval,
PunctuationType type,
Punctuator punctuator)
Cancellable schedule(
long interval,
PunctuationType type,
Punctuator punctuator)
schedule creates a PunctuationSchedule (for the current ProcessorNode) and requests the stream-time or system-time PunctuationQueues to schedule the PunctuationSchedule based on the given PunctuationType (STREAM_TIME or WALL_CLOCK_TIME, respectively).
schedule is used when:
ProcessorContextImplis requested to schedule a recurring action
Stream- and System-Time PunctuationQueues¶
StreamTask creates two PunctuationQueues when created for the following Punctuators:
streamTimePunctuationQueuequeue forSTREAM_TIME-type punctuatorssystemTimePunctuationQueuequeue forWALL_CLOCK_TIME-type punctuators
Punctuators are added to a corresponding PunctuationQueue when StreamTask is requested to schedule a recurring action.
The actions are executed every maybePunctuateStreamTime or maybePunctuateSystemTime (when TaskManager is requested to punctuate).
maybePunctuateStreamTime¶
boolean maybePunctuateStreamTime()
maybePunctuateStreamTime returns true when at least one STREAM_TIME punctuator has been executed.
maybePunctuateStreamTime requests the PartitionGroup for the stream time.
Note
Stream time for STREAM_TIME punctuators is determined using the PartitionGroup.
maybePunctuateStreamTime returns false for the stream time as RecordQueue.UNKNOWN (the stream time is yet to be determined and unknown).
maybePunctuateStreamTime requests the stream-time PunctuationQueue to mayPunctuate (with the stream time, STREAM_TIME punctuation type and this StreamTask). If there was at least one recurring action triggered (punctuated), maybePunctuateStreamTime marks this StreamTask as commitNeeded.
maybePunctuateStreamTime is part of the Task abstraction.
maybePunctuateSystemTime¶
boolean maybePunctuateSystemTime()
maybePunctuateSystemTime returns true when at least one WALL_CLOCK_TIME punctuator has been executed.
maybePunctuateSystemTime requests the Time for the current time (in ms).
Note
System time for WALL_CLOCK_TIME punctuators is determined using the Time.
maybePunctuateSystemTime requests the system-time PunctuationQueue to mayPunctuate (with the current system time, WALL_CLOCK_TIME punctuation type and this StreamTask). If there was at least one recurring action triggered (punctuated), maybePunctuateSystemTime marks this StreamTask as commitNeeded.
maybePunctuateSystemTime is part of the Task abstraction.
prepareCommit¶
Map<TopicPartition, OffsetAndMetadata> prepareCommit()
prepareCommit...FIXME
prepareCommit is part of the Task abstraction.
committableOffsetsAndMetadata¶
Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata()
committableOffsetsAndMetadata...FIXME
Adding Records¶
void addRecords(
TopicPartition partition,
Iterable<ConsumerRecord<byte[], byte[]>> records)
addRecords requests the PartitionGroup to add the given ConsumerRecords (for the given TopicPartition).
addRecords prints out the following TRACE message to the logs:
Added records into the buffered queue of partition [partition],
new queue size is [newQueueSize]
addRecords can request the main Consumer to pause (suspend) fetching and consuming records from the partition if the queue size for the partition (after adding the new raw records) crossed the buffered.records.per.partition threshold.
addRecords is part of the Task abstraction.
addPartitionsForOffsetReset¶
void addPartitionsForOffsetReset(
Set<TopicPartition> partitionsForOffsetReset)
addPartitionsForOffsetReset...FIXME
addPartitionsForOffsetReset is part of the Task abstraction.
Processing One Record¶
boolean process(
long wallClockTime)
process...FIXME
process is part of the Task abstraction.
punctuate¶
void punctuate(
ProcessorNode<?, ?, ?, ?> node,
long timestamp,
PunctuationType type,
Punctuator punctuator)
punctuate...FIXME
punctuate is part of the ProcessorNodePunctuator abstraction.
completeRestoration¶
void completeRestoration(
java.util.function.Consumer<Set<TopicPartition>> offsetResetter)
completeRestoration...FIXME
completeRestoration is part of the Task abstraction.
resetOffsetsIfNeededAndInitializeMetadata¶
void resetOffsetsIfNeededAndInitializeMetadata(
java.util.function.Consumer<Set<TopicPartition>> offsetResetter)
resetOffsetsIfNeededAndInitializeMetadata...FIXME
Logging¶
Enable ALL logging level for org.apache.kafka.streams.processor.internals.StreamTask logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.kafka.streams.processor.internals.StreamTask=ALL
Refer to Logging.