GroupCoordinator¶

GroupCoordinator is elected as the group coordinator for every partition to handle consumer groups that are "assigned" to this partition. There are going to be as many GroupCoordinators as there are offsets.topic.num.partitions.
Creating Instance¶
GroupCoordinator takes the following to be created:
- broker.id
- GroupConfig
- OffsetConfig
- GroupMetadataManager
-
DelayedOperationPurgatory[DelayedHeartbeat] -
DelayedOperationPurgatory[DelayedRebalance] -
Time - Metrics
GroupCoordinator is created using apply factory.
GroupMetadataManager¶
GroupCoordinator is given a GroupMetadataManager when created.
GroupConfig¶
GroupCoordinator is given a GroupConfig when created.
GroupConfig is a collection of the configuration properties:
Creating GroupCoordinator Instance¶
apply(
config: KafkaConfig,
replicaManager: ReplicaManager,
time: Time,
metrics: Metrics): GroupCoordinator // (1)!
apply(
config: KafkaConfig,
replicaManager: ReplicaManager,
heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
time: Time,
metrics: Metrics): GroupCoordinator
- Creates
DelayedOperationPurgatorys and calls the otherapply
Note
All GroupCoordinator really needs for work is ReplicaManager.
apply creates an OffsetConfig (based on the given KafkaConfig).
apply creates a GroupConfig.
apply creates a GroupMetadataManager based on the following configuration properties (in the KafkaConfig):
In the end, apply creates a GroupCoordinator.
apply is used when:
Creating OffsetConfig¶
offsetConfig(
config: KafkaConfig): OffsetConfig
offsetConfig uses the KafkaConfig for the following configuration properties to create an OffsetConfig:
- offsetMetadataMaxSize
- offsetsLoadBufferSize
- offsetsRetentionMinutes
- offsetsRetentionCheckIntervalMs
- offsets.topic.num.partitions
- offsetsTopicSegmentBytes
- offsetsTopicReplicationFactor
- offsetsTopicCompressionCodec
- offsetCommitTimeoutMs
- offsetCommitRequiredAcks
onElection¶
onElection(
offsetTopicPartitionId: Int,
coordinatorEpoch: Int): Unit
onElection prints out the following INFO message to the logs:
Elected as the group coordinator for partition [offsetTopicPartitionId] in epoch [coordinatorEpoch]
onElection requests the GroupMetadataManager to scheduleLoadGroupAndOffsets.
onElection is used when:
RequestHandlerHelperis requested toonLeadershipChangefor __consumer_offsetsBrokerMetadataPublisheris requested topublishmetadata for __consumer_offsets
Starting Up¶
startup(
retrieveGroupMetadataTopicPartitionCount: () => Int,
enableMetadataExpiration: Boolean = true): Unit
startup prints out the following INFO message to the logs:
Starting up.
startup requests the GroupMetadataManager to start up.
In the end, startup prints out the following INFO message to the logs:
Startup complete.
startup is used when:
KafkaServeris requested to start upBrokerMetadataPublisheris requested toinitializeManagers
Logging¶
Enable ALL logging level for kafka.coordinator.group.GroupCoordinator logger to see what happens inside.
Add the following line to config/log4j.properties:
log4j.logger.kafka.coordinator.group.GroupCoordinator=ALL
Refer to Logging.
logIdent¶
GroupCoordinator uses the following logging prefix (with the broker.id):
[GroupCoordinator [brokerId]]