Skip to content

KafkaConsumer

KafkaConsumer is a Consumer.

Creating Instance

KafkaConsumer takes the following to be created:

  • Configuration (ConsumerConfig or Map<String, Object> or Properties)
  • Deserializer<K>
  • Deserializer<V>

Group ID

KafkaConsumer can be given a group ID using group.id (indirectly in the config) configuration property when created.

IsolationLevel

KafkaConsumer can be given an IsolationLevel using isolation.level configuration property (indirectly in the config) when created.

KafkaConsumer uses the IsolationLevel for the following:

Fetcher

KafkaConsumer creates a Fetcher when created.

ConsumerCoordinator

KafkaConsumer creates a ConsumerCoordinator when created with the group.id specified.

enforceRebalance

void enforceRebalance()

enforceRebalance requests the ConsumerCoordinator to requestRejoin with the following reason:

rebalance enforced by user

enforceRebalance is part of the Consumer abstraction.

groupMetadata

ConsumerGroupMetadata groupMetadata()

groupMetadata...FIXME

groupMetadata is part of the Consumer abstraction.

Polling for Records

ConsumerRecords<K, V> poll(
  Duration timeout) // (1)
ConsumerRecords<K, V> poll(
  Timer timer,
  boolean includeMetadataInTimeout) // (2)
  1. Uses includeMetadataInTimeout enabled (true)
  2. A private method

poll...FIXME

poll is part of the Consumer abstraction.

Polling for Fetches

Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(
  Timer timer)

pollForFetches requests the Fetcher for fetched records and returns them immediately if available.

Otherwise, pollForFetches requests the Fetcher to sendFetches.

pollForFetches prints out the following TRACE message to the logs:

Polling for fetches with timeout [pollTimeout]

pollForFetches requests the ConsumerNetworkClient to poll (with the pollTimeout until it expires or the Fetcher has some available fetches ready).

In the end, pollForFetches requests the Fetcher for the fetched records again.

Subscribing to Topics

void subscribe(
  Collection<String> topics) // (1)
void subscribe(
  Collection<String> topics,
  ConsumerRebalanceListener listener)
void subscribe(
  Pattern pattern) // (2)
void subscribe(
  Pattern pattern,
  ConsumerRebalanceListener callback)
  1. Uses NoOpConsumerRebalanceListener
  2. Uses NoOpConsumerRebalanceListener

subscribe...FIXME

subscribe is part of the Consumer abstraction.

Waking Up

void wakeup()

wakeup...FIXME

wakeup is part of the Consumer abstraction.