RecordQueue is a queue of ConsumerRecords (with the head as a deserialized record and the tail of raw records).

RecordQueue is created for an input partition (for the PartitionGroup of a StreamTask).

RecordQueue is given a TimestampExtractor when created.

fifoQueue (of ConsumerRecords)

RecordQueue defines fifoQueue internal registry as a ArrayDeque (Java) of serialized (raw) ConsumerRecord<byte[], byte[]>s.

RecordQueue creates an empty ArrayDeque when created.

ConsumerRecords are added in addRawRecords.


RecordQueue defines headRecord internal registry of StampedRecord (with a ConsumerRecord<Object, Object> and the extracted timestamp).

A StampedRecord is retrieved (and removed) from the fifoQueue when RecordQueue is requested to updateHead.


Updating Head Record

void updateHead()

updateHead does its work until the headRecord is found in the fifoQueue.

In other words, updateHead makes sure that the headRecord is available (for the follow-up operations) if there are timestamp-valid records in the fifoQueue.

updateHead takes (and removes) the first ConsumerRecord from the fifoQueue and requests the RecordDeserializer to deserialize the record (with the ProcessorContext).

updateHead skips this record if the RecordDeserializer returns null (to announce to skip the record) and starts over.

updateHead requests the TimestampExtractor to extract the timestamp (with the partitionTime).

updateHead prints out the following TRACE message to the logs:

Source node [name] extracted timestamp [timestamp] for record [record]

updateHead creates a new StampedRecord (with the deserialized ConsumerRecord and the timestamp).

If the extracted timestamp from the deserialized ConsumerRecord is negative, updateHead prints out the following WARN message to the logs, requests the droppedRecords sensor to record the event and starts over.

Skipping record due to negative extracted timestamp. topic=[[topic]] partition=[[partition]] offset=[[offset]] extractedTimestamp=[[timestamp]] extractor=[[timestampExtractor]]

Enable ALL logging level for org.apache.kafka.streams.processor.internals.RecordQueue logger to see what happens inside.

Refer to Logging.

