Skip to content

Fetcher

Fetcher<K, V> is used by KafkaConsumer for fetching records.

Creating Instance

Fetcher takes the following to be created:

Fetcher is created along with KafkaConsumer.

IsolationLevel

Fetcher is given an IsolationLevel when created (based on isolation.level configuration property)

Fetcher uses the IsolationLevel for the following:

Sending Fetch Requests

int sendFetches()

sendFetches prepare fetch requests for the nodes with the assigned partitions (preferred read replicas or leaders).

For every fetch request, sendFetches prints out the following DEBUG message to the logs:

Sending [isolationLevel] [data] to broker [fetchTarget]

sendFetches requests the ConsumerNetworkClient to send the fetch request (to the fetchTarget ndoe) and registers the node in the nodesWithPendingFetchRequests.

On successful response, for every partitions sendFetches prints out the following DEBUG message to the logs and adds a new CompletedFetch to the completedFetches registry.

Fetch [isolationLevel] at offset [fetchOffset] for partition [partition]
returned fetch data [partitionData]

In the end, sendFetches removes the request from the nodesWithPendingFetchRequests registry.

sendFetches is used when:

prepareFetchRequests

Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()

prepareFetchRequests...FIXME

Preferred Read Replica

Node selectReadReplica(
  TopicPartition partition,
  Node leaderReplica,
  long currentTimeMs)

selectReadReplica requests the SubscriptionState for the preferredReadReplica of the given TopicPartition.

offsetsForTimes

Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
  Map<TopicPartition, Long> timestampsToSearch,
  Timer timer)

offsetsForTimes...FIXME

offsetsForTimes is used when:

beginningOffsets

Map<TopicPartition, Long> beginningOffsets(
  Collection<TopicPartition> partitions,
  Timer timer)

beginningOffsets...FIXME

beginningOffsets is used when:

endOffsets

Map<TopicPartition, Long> endOffsets(
  Collection<TopicPartition> partitions,
  Timer timer)

endOffsets...FIXME

endOffsets is used when:

beginningOrEndOffset

Map<TopicPartition, Long> beginningOrEndOffset(
  Collection<TopicPartition> partitions,
  long timestamp,
  Timer timer)

beginningOrEndOffset...FIXME

beginningOrEndOffset is used when:

fetchOffsetsByTimes

ListOffsetResult fetchOffsetsByTimes(
  Map<TopicPartition, Long> timestampsToSearch,
  Timer timer,
  boolean requireTimestamps)

fetchOffsetsByTimes...FIXME

fetchOffsetsByTimes is used when:

sendListOffsetsRequests

RequestFuture<ListOffsetResult> sendListOffsetsRequests(
  Map<TopicPartition, Long> timestampsToSearch,
  boolean requireTimestamps)

sendListOffsetsRequests...FIXME

Fetched Records

Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords()

fetchedRecords returns up to max.poll.records number of records from the CompletedFetch Queue.


For nextInLineFetch unintialized or consumed already, fetchedRecords takes a peek at a CompletedFetch collection of records (in the CompletedFetch Queue). If uninitialized, fetchedRecords initializeCompletedFetch with the records. fetchedRecords saves the CompletedFetch records to the nextInLineFetch internal registry. fetchedRecords takes the CompletedFetch collection of records out (off the CompletedFetch Queue).

For the partition of the nextInLineFetch collection of records paused, fetchedRecords prints out the following DEBUG message to the logs and nulls the nextInLineFetch registry.

Skipping fetching records for assigned partition [p] because it is paused

For all the other cases, fetchedRecords fetches the records out of the nextInLineFetch collection of records (up to the number of records left to fetch).

In the end, fetchedRecords returns the ConsumerRecords per TopicPartition (out of the CompletedFetch Queue).

fetchedRecords is used when:

fetchRecords

List<ConsumerRecord<K, V>> fetchRecords(
  CompletedFetch completedFetch,
  int maxRecords)

fetchRecords...FIXME

initializeCompletedFetch

CompletedFetch initializeCompletedFetch(
  CompletedFetch nextCompletedFetch)

initializeCompletedFetch returns the given CompletedFetch if there were no errors. initializeCompletedFetch updates the SubscriptionState with the current metadata about the partition.


initializeCompletedFetch takes the partition, the PartitionData and the fetch offset from the given CompletedFetch.

initializeCompletedFetch prints out the following TRACE message to the logs:

Preparing to read [n] bytes of data for partition [p] with offset [o]

initializeCompletedFetch takes the RecordBatches from the PartitionData.

With a high watermark given, initializeCompletedFetch prints out the following TRACE message to the logs and requests the SubscriptionState to updateHighWatermark for the partition.

Updating high watermark for partition [p] to [highWatermark]

With a log start offset given, initializeCompletedFetch prints out the following TRACE message to the logs and requests the SubscriptionState to updateLogStartOffset for the partition.

Updating log start offset for partition [p] to [logStartOffset]

With a last stable offset given, initializeCompletedFetch prints out the following TRACE message to the logs and requests the SubscriptionState to updateLastStableOffset for the partition.

Updating last stable offset for partition [p] to [lastStableOffset]

With a preferred read replica given, initializeCompletedFetch prints out the following DEBUG message to the logs and requests the SubscriptionState to updatePreferredReadReplica for the partition.

Updating preferred read replica for partition [p] to [preferredReadReplica] set to expire at [expireTimeMs]

For errors like NOT_LEADER_OR_FOLLOWER, REPLICA_NOT_AVAILABLE, FENCED_LEADER_EPOCH, KAFKA_STORAGE_ERROR, OFFSET_NOT_AVAILABLE, initializeCompletedFetch prints out the following DEBUG message to the logs and requests the ConsumerMetadata to requestUpdate.

Error in fetch for partition [p]: [exceptionName]

For OFFSET_OUT_OF_RANGE error, initializeCompletedFetch requests the SubscriptionState to clearPreferredReadReplica for the partition. With no preferred read replica, it is assumed that the fetch came from the leader.

resetOffsetsIfNeeded

void resetOffsetsIfNeeded()

resetOffsetsIfNeeded...FIXME

resetOffsetsIfNeeded is used when:

resetOffsetsAsync

void resetOffsetsAsync(
  Map<TopicPartition, Long> partitionResetTimestamps)

resetOffsetsAsync...FIXME

sendListOffsetRequest

RequestFuture<ListOffsetResult> sendListOffsetRequest(
  Node node,
  Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
  boolean requireTimestamp)

sendListOffsetRequest...FIXME

sendListOffsetRequest is used when:

clearBufferedDataForUnassignedTopics

void clearBufferedDataForUnassignedTopics(
  Collection<String> assignedTopics)

clearBufferedDataForUnassignedTopics...FIXME

clearBufferedDataForUnassignedTopics is used when:

CompletedFetch Queue

Fetcher creates an empty ConcurrentLinkedQueue (Java) of CompletedFetches when created.

New CompletedFetches (one per partition) are added to the queue in sendFetches (on a successful receipt of response from a Kafka cluster).

Logging

Enable ALL logging level for org.apache.kafka.clients.consumer.internals.Fetcher logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.clients.consumer.internals.Fetcher=ALL

Refer to Logging.