Fetcher¶
Fetcher<K, V> is used by KafkaConsumer for fetching records.
Creating Instance¶
Fetcher takes the following to be created:
-
LogContext - ConsumerNetworkClient
- fetch.min.bytes
- fetch.max.bytes
- fetch.max.wait.ms
- max.partition.fetch.bytes
- max.poll.records
- check.crcs
- client.rack
- Key
Deserializer - Value
Deserializer -
ConsumerMetadata - SubscriptionState
-
Metrics -
FetcherMetricsRegistry -
Time - retry.backoff.ms
- request.timeout.ms
- IsolationLevel
-
ApiVersions
Fetcher is created along with KafkaConsumer.
IsolationLevel¶
Fetcher is given an IsolationLevel when created (based on isolation.level configuration property)
Fetcher uses the IsolationLevel for the following:
Sending Fetch Requests¶
int sendFetches()
sendFetches prepare fetch requests for the nodes with the assigned partitions (preferred read replicas or leaders).
For every fetch request, sendFetches prints out the following DEBUG message to the logs:
Sending [isolationLevel] [data] to broker [fetchTarget]
sendFetches requests the ConsumerNetworkClient to send the fetch request (to the fetchTarget ndoe) and registers the node in the nodesWithPendingFetchRequests.
On successful response, for every partitions sendFetches prints out the following DEBUG message to the logs and adds a new CompletedFetch to the completedFetches registry.
Fetch [isolationLevel] at offset [fetchOffset] for partition [partition]
returned fetch data [partitionData]
In the end, sendFetches removes the request from the nodesWithPendingFetchRequests registry.
sendFetches is used when:
KafkaConsumeris requested to poll (and pollForFetches)
prepareFetchRequests¶
Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
prepareFetchRequests...FIXME
Preferred Read Replica¶
Node selectReadReplica(
TopicPartition partition,
Node leaderReplica,
long currentTimeMs)
selectReadReplica requests the SubscriptionState for the preferredReadReplica of the given TopicPartition.
offsetsForTimes¶
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch,
Timer timer)
offsetsForTimes...FIXME
offsetsForTimes is used when:
KafkaConsumeris requested to offsetsForTimes
beginningOffsets¶
Map<TopicPartition, Long> beginningOffsets(
Collection<TopicPartition> partitions,
Timer timer)
beginningOffsets...FIXME
beginningOffsets is used when:
KafkaConsumeris requested to beginningOffsets
endOffsets¶
Map<TopicPartition, Long> endOffsets(
Collection<TopicPartition> partitions,
Timer timer)
endOffsets...FIXME
endOffsets is used when:
KafkaConsumeris requested to endOffsets and currentLag
beginningOrEndOffset¶
Map<TopicPartition, Long> beginningOrEndOffset(
Collection<TopicPartition> partitions,
long timestamp,
Timer timer)
beginningOrEndOffset...FIXME
beginningOrEndOffset is used when:
Fetcheris requested to beginningOffsets and endOffsets
fetchOffsetsByTimes¶
ListOffsetResult fetchOffsetsByTimes(
Map<TopicPartition, Long> timestampsToSearch,
Timer timer,
boolean requireTimestamps)
fetchOffsetsByTimes...FIXME
fetchOffsetsByTimes is used when:
Fetcheris requested to offsetsForTimes and beginningOrEndOffset
sendListOffsetsRequests¶
RequestFuture<ListOffsetResult> sendListOffsetsRequests(
Map<TopicPartition, Long> timestampsToSearch,
boolean requireTimestamps)
sendListOffsetsRequests...FIXME
Fetched Records¶
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords()
fetchedRecords returns up to max.poll.records number of records from the CompletedFetch Queue.
For nextInLineFetch unintialized or consumed already, fetchedRecords takes a peek at a CompletedFetch collection of records (in the CompletedFetch Queue). If uninitialized, fetchedRecords initializeCompletedFetch with the records. fetchedRecords saves the CompletedFetch records to the nextInLineFetch internal registry. fetchedRecords takes the CompletedFetch collection of records out (off the CompletedFetch Queue).
For the partition of the nextInLineFetch collection of records paused, fetchedRecords prints out the following DEBUG message to the logs and nulls the nextInLineFetch registry.
Skipping fetching records for assigned partition [p] because it is paused
For all the other cases, fetchedRecords fetches the records out of the nextInLineFetch collection of records (up to the number of records left to fetch).
In the end, fetchedRecords returns the ConsumerRecords per TopicPartition (out of the CompletedFetch Queue).
fetchedRecords is used when:
KafkaConsumeris requested to pollForFetches
fetchRecords¶
List<ConsumerRecord<K, V>> fetchRecords(
CompletedFetch completedFetch,
int maxRecords)
fetchRecords...FIXME
initializeCompletedFetch¶
CompletedFetch initializeCompletedFetch(
CompletedFetch nextCompletedFetch)
initializeCompletedFetch returns the given CompletedFetch if there were no errors. initializeCompletedFetch updates the SubscriptionState with the current metadata about the partition.
initializeCompletedFetch takes the partition, the PartitionData and the fetch offset from the given CompletedFetch.
initializeCompletedFetch prints out the following TRACE message to the logs:
Preparing to read [n] bytes of data for partition [p] with offset [o]
initializeCompletedFetch takes the RecordBatches from the PartitionData.
With a high watermark given, initializeCompletedFetch prints out the following TRACE message to the logs and requests the SubscriptionState to updateHighWatermark for the partition.
Updating high watermark for partition [p] to [highWatermark]
With a log start offset given, initializeCompletedFetch prints out the following TRACE message to the logs and requests the SubscriptionState to updateLogStartOffset for the partition.
Updating log start offset for partition [p] to [logStartOffset]
With a last stable offset given, initializeCompletedFetch prints out the following TRACE message to the logs and requests the SubscriptionState to updateLastStableOffset for the partition.
Updating last stable offset for partition [p] to [lastStableOffset]
With a preferred read replica given, initializeCompletedFetch prints out the following DEBUG message to the logs and requests the SubscriptionState to updatePreferredReadReplica for the partition.
Updating preferred read replica for partition [p] to [preferredReadReplica] set to expire at [expireTimeMs]
For errors like NOT_LEADER_OR_FOLLOWER, REPLICA_NOT_AVAILABLE, FENCED_LEADER_EPOCH, KAFKA_STORAGE_ERROR, OFFSET_NOT_AVAILABLE, initializeCompletedFetch prints out the following DEBUG message to the logs and requests the ConsumerMetadata to requestUpdate.
Error in fetch for partition [p]: [exceptionName]
For OFFSET_OUT_OF_RANGE error, initializeCompletedFetch requests the SubscriptionState to clearPreferredReadReplica for the partition. With no preferred read replica, it is assumed that the fetch came from the leader.
resetOffsetsIfNeeded¶
void resetOffsetsIfNeeded()
resetOffsetsIfNeeded...FIXME
resetOffsetsIfNeeded is used when:
KafkaConsumeris requested to updateFetchPositions
resetOffsetsAsync¶
void resetOffsetsAsync(
Map<TopicPartition, Long> partitionResetTimestamps)
resetOffsetsAsync...FIXME
sendListOffsetRequest¶
RequestFuture<ListOffsetResult> sendListOffsetRequest(
Node node,
Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
boolean requireTimestamp)
sendListOffsetRequest...FIXME
sendListOffsetRequest is used when:
Fetcheris requested to resetOffsetsIfNeeded (via resetOffsetsAsync) and fetchOffsetsByTimes (via sendListOffsetsRequests)
clearBufferedDataForUnassignedTopics¶
void clearBufferedDataForUnassignedTopics(
Collection<String> assignedTopics)
clearBufferedDataForUnassignedTopics...FIXME
clearBufferedDataForUnassignedTopics is used when:
KafkaConsumeris requested to subscribe
CompletedFetch Queue¶
Fetcher creates an empty ConcurrentLinkedQueue (Java) of CompletedFetches when created.
New CompletedFetches (one per partition) are added to the queue in sendFetches (on a successful receipt of response from a Kafka cluster).
Logging¶
Enable ALL logging level for org.apache.kafka.clients.consumer.internals.Fetcher logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.org.apache.kafka.clients.consumer.internals.Fetcher=ALL
Refer to Logging.