Fetcher¶
Fetcher<K, V>
is used by KafkaConsumer for fetching records.
Creating Instance¶
Fetcher
takes the following to be created:
-
LogContext
- ConsumerNetworkClient
- fetch.min.bytes
- fetch.max.bytes
- fetch.max.wait.ms
- max.partition.fetch.bytes
- max.poll.records
- check.crcs
- client.rack
- Key
Deserializer
- Value
Deserializer
-
ConsumerMetadata
- SubscriptionState
-
Metrics
-
FetcherMetricsRegistry
-
Time
- retry.backoff.ms
- request.timeout.ms
- IsolationLevel
-
ApiVersions
Fetcher
is created along with KafkaConsumer.
IsolationLevel¶
Fetcher
is given an IsolationLevel
when created (based on isolation.level configuration property)
Fetcher
uses the IsolationLevel
for the following:
Sending Fetch Requests¶
int sendFetches()
sendFetches
prepare fetch requests for the nodes with the assigned partitions (preferred read replicas or leaders).
For every fetch request, sendFetches
prints out the following DEBUG message to the logs:
Sending [isolationLevel] [data] to broker [fetchTarget]
sendFetches
requests the ConsumerNetworkClient to send the fetch request (to the fetchTarget
ndoe) and registers the node in the nodesWithPendingFetchRequests.
On successful response, for every partitions sendFetches
prints out the following DEBUG message to the logs and adds a new CompletedFetch
to the completedFetches registry.
Fetch [isolationLevel] at offset [fetchOffset] for partition [partition]
returned fetch data [partitionData]
In the end, sendFetches
removes the request from the nodesWithPendingFetchRequests registry.
sendFetches
is used when:
KafkaConsumer
is requested to poll (and pollForFetches)
prepareFetchRequests¶
Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
prepareFetchRequests
...FIXME
Preferred Read Replica¶
Node selectReadReplica(
TopicPartition partition,
Node leaderReplica,
long currentTimeMs)
selectReadReplica
requests the SubscriptionState for the preferredReadReplica of the given TopicPartition
.
offsetsForTimes¶
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch,
Timer timer)
offsetsForTimes
...FIXME
offsetsForTimes
is used when:
KafkaConsumer
is requested to offsetsForTimes
beginningOffsets¶
Map<TopicPartition, Long> beginningOffsets(
Collection<TopicPartition> partitions,
Timer timer)
beginningOffsets
...FIXME
beginningOffsets
is used when:
KafkaConsumer
is requested to beginningOffsets
endOffsets¶
Map<TopicPartition, Long> endOffsets(
Collection<TopicPartition> partitions,
Timer timer)
endOffsets
...FIXME
endOffsets
is used when:
KafkaConsumer
is requested to endOffsets and currentLag
beginningOrEndOffset¶
Map<TopicPartition, Long> beginningOrEndOffset(
Collection<TopicPartition> partitions,
long timestamp,
Timer timer)
beginningOrEndOffset
...FIXME
beginningOrEndOffset
is used when:
Fetcher
is requested to beginningOffsets and endOffsets
fetchOffsetsByTimes¶
ListOffsetResult fetchOffsetsByTimes(
Map<TopicPartition, Long> timestampsToSearch,
Timer timer,
boolean requireTimestamps)
fetchOffsetsByTimes
...FIXME
fetchOffsetsByTimes
is used when:
Fetcher
is requested to offsetsForTimes and beginningOrEndOffset
sendListOffsetsRequests¶
RequestFuture<ListOffsetResult> sendListOffsetsRequests(
Map<TopicPartition, Long> timestampsToSearch,
boolean requireTimestamps)
sendListOffsetsRequests
...FIXME
Fetched Records¶
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords()
fetchedRecords
returns up to max.poll.records number of records from the CompletedFetch Queue.
For nextInLineFetch
unintialized or consumed already, fetchedRecords
takes a peek at a CompletedFetch
collection of records (in the CompletedFetch Queue). If uninitialized, fetchedRecords
initializeCompletedFetch with the records. fetchedRecords
saves the CompletedFetch
records to the nextInLineFetch internal registry. fetchedRecords
takes the CompletedFetch
collection of records out (off the CompletedFetch Queue).
For the partition of the nextInLineFetch collection of records paused, fetchedRecords
prints out the following DEBUG message to the logs and null
s the nextInLineFetch registry.
Skipping fetching records for assigned partition [p] because it is paused
For all the other cases, fetchedRecords
fetches the records out of the nextInLineFetch collection of records (up to the number of records left to fetch).
In the end, fetchedRecords
returns the ConsumerRecord
s per TopicPartition
(out of the CompletedFetch Queue).
fetchedRecords
is used when:
KafkaConsumer
is requested to pollForFetches
fetchRecords¶
List<ConsumerRecord<K, V>> fetchRecords(
CompletedFetch completedFetch,
int maxRecords)
fetchRecords
...FIXME
initializeCompletedFetch¶
CompletedFetch initializeCompletedFetch(
CompletedFetch nextCompletedFetch)
initializeCompletedFetch
returns the given CompletedFetch
if there were no errors. initializeCompletedFetch
updates the SubscriptionState with the current metadata about the partition.
initializeCompletedFetch
takes the partition, the PartitionData
and the fetch offset from the given CompletedFetch
.
initializeCompletedFetch
prints out the following TRACE message to the logs:
Preparing to read [n] bytes of data for partition [p] with offset [o]
initializeCompletedFetch
takes the RecordBatch
es from the PartitionData
.
With a high watermark given, initializeCompletedFetch
prints out the following TRACE message to the logs and requests the SubscriptionState to updateHighWatermark for the partition.
Updating high watermark for partition [p] to [highWatermark]
With a log start offset given, initializeCompletedFetch
prints out the following TRACE message to the logs and requests the SubscriptionState to updateLogStartOffset for the partition.
Updating log start offset for partition [p] to [logStartOffset]
With a last stable offset given, initializeCompletedFetch
prints out the following TRACE message to the logs and requests the SubscriptionState to updateLastStableOffset for the partition.
Updating last stable offset for partition [p] to [lastStableOffset]
With a preferred read replica given, initializeCompletedFetch
prints out the following DEBUG message to the logs and requests the SubscriptionState to updatePreferredReadReplica for the partition.
Updating preferred read replica for partition [p] to [preferredReadReplica] set to expire at [expireTimeMs]
For errors like NOT_LEADER_OR_FOLLOWER
, REPLICA_NOT_AVAILABLE
, FENCED_LEADER_EPOCH
, KAFKA_STORAGE_ERROR
, OFFSET_NOT_AVAILABLE
, initializeCompletedFetch
prints out the following DEBUG message to the logs and requests the ConsumerMetadata to requestUpdate
.
Error in fetch for partition [p]: [exceptionName]
For OFFSET_OUT_OF_RANGE
error, initializeCompletedFetch
requests the SubscriptionState to clearPreferredReadReplica for the partition. With no preferred read replica, it is assumed that the fetch came from the leader.
resetOffsetsIfNeeded¶
void resetOffsetsIfNeeded()
resetOffsetsIfNeeded
...FIXME
resetOffsetsIfNeeded
is used when:
KafkaConsumer
is requested to updateFetchPositions
resetOffsetsAsync¶
void resetOffsetsAsync(
Map<TopicPartition, Long> partitionResetTimestamps)
resetOffsetsAsync
...FIXME
sendListOffsetRequest¶
RequestFuture<ListOffsetResult> sendListOffsetRequest(
Node node,
Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
boolean requireTimestamp)
sendListOffsetRequest
...FIXME
sendListOffsetRequest
is used when:
Fetcher
is requested to resetOffsetsIfNeeded (via resetOffsetsAsync) and fetchOffsetsByTimes (via sendListOffsetsRequests)
clearBufferedDataForUnassignedTopics¶
void clearBufferedDataForUnassignedTopics(
Collection<String> assignedTopics)
clearBufferedDataForUnassignedTopics
...FIXME
clearBufferedDataForUnassignedTopics
is used when:
KafkaConsumer
is requested to subscribe
CompletedFetch Queue¶
Fetcher
creates an empty ConcurrentLinkedQueue
(Java) of CompletedFetch
es when created.
New CompletedFetch
es (one per partition) are added to the queue in sendFetches (on a successful receipt of response from a Kafka cluster).
Logging¶
Enable ALL
logging level for org.apache.kafka.clients.consumer.internals.Fetcher
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.org.apache.kafka.clients.consumer.internals.Fetcher=ALL
Refer to Logging.