RecordQueue¶
RecordQueue
is a queue of ConsumerRecord
s (with the head as a deserialized record and the tail of raw records).
RecordQueue
is created for an input partition (for the PartitionGroup of a StreamTask).
Creating Instance¶
RecordQueue
takes the following to be created:
-
TopicPartition
- SourceNode
- TimestampExtractor
-
DeserializationExceptionHandler
- InternalProcessorContext
-
LogContext
RecordQueue
is created when:
RecordQueueCreator
is requested for a new RecordQueue
TimestampExtractor¶
RecordQueue
is given a TimestampExtractor when created.
fifoQueue (of ConsumerRecords)¶
RecordQueue
defines fifoQueue
internal registry as a ArrayDeque
(Java) of serialized (raw) ConsumerRecord<byte[], byte[]>
s.
RecordQueue
creates an empty ArrayDeque
when created.
ConsumerRecord
s are added in addRawRecords.
StampedRecord¶
RecordQueue
defines headRecord
internal registry of StampedRecord
(with a ConsumerRecord<Object, Object>
and the extracted timestamp).
A StampedRecord
is retrieved (and removed) from the fifoQueue when RecordQueue
is requested to updateHead.
addRawRecords¶
int addRawRecords(
Iterable<ConsumerRecord<byte[], byte[]>> rawRecords)
addRawRecords
...FIXME
addRawRecords
is used when:
PartitionGroup
is requested to add ConsumerRecords
poll¶
StampedRecord poll()
poll
...FIXME
poll
is used when:
PartitionGroup
is requested for next record
Updating Head Record¶
void updateHead()
updateHead
does its work until the headRecord is found in the fifoQueue.
In other words, updateHead
makes sure that the headRecord is available (for the follow-up operations) if there are timestamp-valid records in the fifoQueue.
updateHead
takes (and removes) the first ConsumerRecord
from the fifoQueue and requests the RecordDeserializer to deserialize the record (with the ProcessorContext).
updateHead
skips this record if the RecordDeserializer returns null
(to announce to skip the record) and starts over.
updateHead
requests the TimestampExtractor to extract the timestamp (with the partitionTime).
updateHead
prints out the following TRACE message to the logs:
Source node [name] extracted timestamp [timestamp] for record [record]
updateHead
creates a new StampedRecord
(with the deserialized ConsumerRecord
and the timestamp).
If the extracted timestamp from the deserialized ConsumerRecord
is negative, updateHead
prints out the following WARN message to the logs, requests the droppedRecords sensor to record the event and starts over.
Skipping record due to negative extracted timestamp. topic=[[topic]] partition=[[partition]] offset=[[offset]] extractedTimestamp=[[timestamp]] extractor=[[timestampExtractor]]
updateHead
is used when:
RecordQueue
is requested to addRawRecords and poll
Logging¶
Enable ALL
logging level for org.apache.kafka.streams.processor.internals.RecordQueue
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.kafka.streams.processor.internals.RecordQueue=ALL
Refer to Logging.