StreamTask¶
StreamTask
is a concrete AbstractTask.
StreamTask
creates exactly one PartitionGroup to handle all the input partitions.
Creating Instance¶
StreamTask
takes the following to be created:
- TaskId
- Input
TopicPartition
s - ProcessorTopology
- Kafka Consumer
- StreamsConfig
- StreamsMetricsImpl
- StateDirectory
- ThreadCache
-
Time
- ProcessorStateManager
- RecordCollector
- InternalProcessorContext
-
LogContext
StreamTask
is created when:
ActiveTaskCreator
is requested to create an active taskTopologyTestDriver
is requested to setup a task
RecordQueueCreator¶
When created, StreamTask
creates a RecordQueueCreator based on the following configuration properties:
StreamTask
uses the RecordQueueCreator
when:
buffered.records.per.partition¶
StreamTask
uses buffered.records.per.partition configuration property to control when to pause and resume record consumption (on a partition) when requested to add and process records, respectively.
Main Kafka Consumer¶
StreamTask
is given a Consumer<byte[], byte[]>
(Apache Kafka) when created.
StreamTask
uses the Consumer
for the following:
- Adding Records
- addPartitionsForOffsetReset
- prepareCommit (to committableOffsetsAndMetadata)
- Processing One Record
- completeRestoration (to resetOffsetsIfNeededAndInitializeMetadata)
RecordCollector¶
StreamTask
is given a RecordCollector when created.
PartitionGroup¶
When created, StreamTask
creates a PartitionGroup with the following:
- Partition Queues
currentLag
function (Apache Kafka) of the main Consumer- recordLatenessSensor
- enforcedProcessingSensor
- max.task.idle.ms configuration property
createPartitionQueues¶
Map<TopicPartition, RecordQueue> createPartitionQueues()
createPartitionQueues
requests the RecordQueueCreator to create one RecordQueue per every partition in the input partitions.
updateInputPartitions¶
void updateInputPartitions(
Set<TopicPartition> topicPartitions,
Map<String, List<String>> allTopologyNodesToSourceTopics)
updateInputPartitions
updateInputPartitions.
In the end, updateInputPartitions
requests the PartitionGroup to updatePartitions (with the given TopicPartition
s and createQueue factory).
updateInputPartitions
is part of the Task abstraction.
streamTime¶
long streamTime()
streamTime
requests the PartitionGroup for the streamTime.
streamTime
is used when:
ProcessorContextImpl
is requested for the current stream time
Scheduling Recurring Action¶
Cancellable schedule(
long startTime,
long interval,
PunctuationType type,
Punctuator punctuator)
Cancellable schedule(
long interval,
PunctuationType type,
Punctuator punctuator)
schedule
creates a PunctuationSchedule (for the current ProcessorNode) and requests the stream-time or system-time PunctuationQueue
s to schedule the PunctuationSchedule based on the given PunctuationType
(STREAM_TIME
or WALL_CLOCK_TIME
, respectively).
schedule
is used when:
ProcessorContextImpl
is requested to schedule a recurring action
Stream- and System-Time PunctuationQueues¶
StreamTask
creates two PunctuationQueues when created for the following Punctuators:
streamTimePunctuationQueue
queue forSTREAM_TIME
-type punctuatorssystemTimePunctuationQueue
queue forWALL_CLOCK_TIME
-type punctuators
Punctuator
s are added to a corresponding PunctuationQueue
when StreamTask
is requested to schedule a recurring action.
The actions are executed every maybePunctuateStreamTime or maybePunctuateSystemTime (when TaskManager
is requested to punctuate).
maybePunctuateStreamTime¶
boolean maybePunctuateStreamTime()
maybePunctuateStreamTime
returns true
when at least one STREAM_TIME
punctuator has been executed.
maybePunctuateStreamTime
requests the PartitionGroup for the stream time.
Note
Stream time for STREAM_TIME
punctuators is determined using the PartitionGroup.
maybePunctuateStreamTime
returns false
for the stream time as RecordQueue.UNKNOWN
(the stream time is yet to be determined and unknown).
maybePunctuateStreamTime
requests the stream-time PunctuationQueue to mayPunctuate (with the stream time, STREAM_TIME
punctuation type and this StreamTask
). If there was at least one recurring action triggered (punctuated), maybePunctuateStreamTime
marks this StreamTask
as commitNeeded.
maybePunctuateStreamTime
is part of the Task abstraction.
maybePunctuateSystemTime¶
boolean maybePunctuateSystemTime()
maybePunctuateSystemTime
returns true
when at least one WALL_CLOCK_TIME
punctuator has been executed.
maybePunctuateSystemTime
requests the Time for the current time (in ms).
Note
System time for WALL_CLOCK_TIME
punctuators is determined using the Time.
maybePunctuateSystemTime
requests the system-time PunctuationQueue to mayPunctuate (with the current system time, WALL_CLOCK_TIME
punctuation type and this StreamTask
). If there was at least one recurring action triggered (punctuated), maybePunctuateSystemTime
marks this StreamTask
as commitNeeded.
maybePunctuateSystemTime
is part of the Task abstraction.
prepareCommit¶
Map<TopicPartition, OffsetAndMetadata> prepareCommit()
prepareCommit
...FIXME
prepareCommit
is part of the Task abstraction.
committableOffsetsAndMetadata¶
Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata()
committableOffsetsAndMetadata
...FIXME
Adding Records¶
void addRecords(
TopicPartition partition,
Iterable<ConsumerRecord<byte[], byte[]>> records)
addRecords
requests the PartitionGroup to add the given ConsumerRecords (for the given TopicPartition
).
addRecords
prints out the following TRACE message to the logs:
Added records into the buffered queue of partition [partition],
new queue size is [newQueueSize]
addRecords
can request the main Consumer to pause (suspend) fetching and consuming records from the partition if the queue size for the partition (after adding the new raw records) crossed the buffered.records.per.partition threshold.
addRecords
is part of the Task abstraction.
addPartitionsForOffsetReset¶
void addPartitionsForOffsetReset(
Set<TopicPartition> partitionsForOffsetReset)
addPartitionsForOffsetReset
...FIXME
addPartitionsForOffsetReset
is part of the Task abstraction.
Processing One Record¶
boolean process(
long wallClockTime)
process
...FIXME
process
is part of the Task abstraction.
punctuate¶
void punctuate(
ProcessorNode<?, ?, ?, ?> node,
long timestamp,
PunctuationType type,
Punctuator punctuator)
punctuate
...FIXME
punctuate
is part of the ProcessorNodePunctuator abstraction.
completeRestoration¶
void completeRestoration(
java.util.function.Consumer<Set<TopicPartition>> offsetResetter)
completeRestoration
...FIXME
completeRestoration
is part of the Task abstraction.
resetOffsetsIfNeededAndInitializeMetadata¶
void resetOffsetsIfNeededAndInitializeMetadata(
java.util.function.Consumer<Set<TopicPartition>> offsetResetter)
resetOffsetsIfNeededAndInitializeMetadata
...FIXME
Logging¶
Enable ALL
logging level for org.apache.kafka.streams.processor.internals.StreamTask
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.kafka.streams.processor.internals.StreamTask=ALL
Refer to Logging.