Skip to content

GroupCoordinator

GroupCoordinator's Startup

GroupCoordinator is elected as the group coordinator for every partition to handle consumer groups that are "assigned" to this partition. There are going to be as many GroupCoordinators as there are offsets.topic.num.partitions.

Creating Instance

GroupCoordinator takes the following to be created:

GroupCoordinator is created using apply factory.

GroupMetadataManager

GroupCoordinator is given a GroupMetadataManager when created.

GroupConfig

GroupCoordinator is given a GroupConfig when created.

GroupConfig is a collection of the configuration properties:

Creating GroupCoordinator Instance

apply(
  config: KafkaConfig,
  replicaManager: ReplicaManager,
  time: Time,
  metrics: Metrics): GroupCoordinator // (1)!
apply(
  config: KafkaConfig,
  replicaManager: ReplicaManager,
  heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
  rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
  time: Time,
  metrics: Metrics): GroupCoordinator
  1. Creates DelayedOperationPurgatorys and calls the other apply

Note

All GroupCoordinator really needs for work is ReplicaManager.

apply creates an OffsetConfig (based on the given KafkaConfig).

apply creates a GroupConfig.

apply creates a GroupMetadataManager based on the following configuration properties (in the KafkaConfig):

In the end, apply creates a GroupCoordinator.


apply is used when:

Creating OffsetConfig

offsetConfig(
  config: KafkaConfig): OffsetConfig

offsetConfig uses the KafkaConfig for the following configuration properties to create an OffsetConfig:

onElection

onElection(
  offsetTopicPartitionId: Int,
  coordinatorEpoch: Int): Unit

onElection prints out the following INFO message to the logs:

Elected as the group coordinator for partition [offsetTopicPartitionId] in epoch [coordinatorEpoch]

onElection requests the GroupMetadataManager to scheduleLoadGroupAndOffsets.


onElection is used when:

Starting Up

startup(
  retrieveGroupMetadataTopicPartitionCount: () => Int,
  enableMetadataExpiration: Boolean = true): Unit

startup prints out the following INFO message to the logs:

Starting up.

startup requests the GroupMetadataManager to start up.

In the end, startup prints out the following INFO message to the logs:

Startup complete.

startup is used when:

  • KafkaServer is requested to start up
  • BrokerMetadataPublisher is requested to initializeManagers

Logging

Enable ALL logging level for kafka.coordinator.group.GroupCoordinator logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.coordinator.group.GroupCoordinator=ALL

Refer to Logging.

logIdent

GroupCoordinator uses the following logging prefix (with the broker.id):

[GroupCoordinator [brokerId]]