Skip to content

RecordQueue

RecordQueue is a queue of ConsumerRecords (with the head as a deserialized record and the tail of raw records).

RecordQueue is created for an input partition (for the PartitionGroup of a StreamTask).

Creating Instance

RecordQueue takes the following to be created:

RecordQueue is created when:

TimestampExtractor

RecordQueue is given a TimestampExtractor when created.

fifoQueue (of ConsumerRecords)

RecordQueue defines fifoQueue internal registry as a ArrayDeque (Java) of serialized (raw) ConsumerRecord<byte[], byte[]>s.

RecordQueue creates an empty ArrayDeque when created.

ConsumerRecords are added in addRawRecords.

StampedRecord

RecordQueue defines headRecord internal registry of StampedRecord (with a ConsumerRecord<Object, Object> and the extracted timestamp).

A StampedRecord is retrieved (and removed) from the fifoQueue when RecordQueue is requested to updateHead.

addRawRecords

int addRawRecords(
  Iterable<ConsumerRecord<byte[], byte[]>> rawRecords)

addRawRecords...FIXME

addRawRecords is used when:

poll

StampedRecord poll()

poll...FIXME

poll is used when:

Updating Head Record

void updateHead()

updateHead does its work until the headRecord is found in the fifoQueue.


In other words, updateHead makes sure that the headRecord is available (for the follow-up operations) if there are timestamp-valid records in the fifoQueue.


updateHead takes (and removes) the first ConsumerRecord from the fifoQueue and requests the RecordDeserializer to deserialize the record (with the ProcessorContext).

updateHead skips this record if the RecordDeserializer returns null (to announce to skip the record) and starts over.

updateHead requests the TimestampExtractor to extract the timestamp (with the partitionTime).

updateHead prints out the following TRACE message to the logs:

Source node [name] extracted timestamp [timestamp] for record [record]

updateHead creates a new StampedRecord (with the deserialized ConsumerRecord and the timestamp).


If the extracted timestamp from the deserialized ConsumerRecord is negative, updateHead prints out the following WARN message to the logs, requests the droppedRecords sensor to record the event and starts over.

Skipping record due to negative extracted timestamp. topic=[[topic]] partition=[[partition]] offset=[[offset]] extractedTimestamp=[[timestamp]] extractor=[[timestampExtractor]]

updateHead is used when:

Logging

Enable ALL logging level for org.apache.kafka.streams.processor.internals.RecordQueue logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.RecordQueue=ALL

Refer to Logging.

Back to top