GroupCoordinator¶
GroupCoordinator
is elected as the group coordinator for every partition to handle consumer groups that are "assigned" to this partition. There are going to be as many GroupCoordinator
s as there are offsets.topic.num.partitions.
Creating Instance¶
GroupCoordinator
takes the following to be created:
- broker.id
- GroupConfig
- OffsetConfig
- GroupMetadataManager
-
DelayedOperationPurgatory[DelayedHeartbeat]
-
DelayedOperationPurgatory[DelayedRebalance]
-
Time
- Metrics
GroupCoordinator
is created using apply factory.
GroupMetadataManager¶
GroupCoordinator
is given a GroupMetadataManager when created.
GroupConfig¶
GroupCoordinator
is given a GroupConfig
when created.
GroupConfig
is a collection of the configuration properties:
Creating GroupCoordinator Instance¶
apply(
config: KafkaConfig,
replicaManager: ReplicaManager,
time: Time,
metrics: Metrics): GroupCoordinator // (1)!
apply(
config: KafkaConfig,
replicaManager: ReplicaManager,
heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
time: Time,
metrics: Metrics): GroupCoordinator
- Creates
DelayedOperationPurgatory
s and calls the otherapply
Note
All GroupCoordinator
really needs for work is ReplicaManager.
apply
creates an OffsetConfig (based on the given KafkaConfig).
apply
creates a GroupConfig.
apply
creates a GroupMetadataManager based on the following configuration properties (in the KafkaConfig):
In the end, apply
creates a GroupCoordinator.
apply
is used when:
Creating OffsetConfig¶
offsetConfig(
config: KafkaConfig): OffsetConfig
offsetConfig
uses the KafkaConfig for the following configuration properties to create an OffsetConfig:
- offsetMetadataMaxSize
- offsetsLoadBufferSize
- offsetsRetentionMinutes
- offsetsRetentionCheckIntervalMs
- offsets.topic.num.partitions
- offsetsTopicSegmentBytes
- offsetsTopicReplicationFactor
- offsetsTopicCompressionCodec
- offsetCommitTimeoutMs
- offsetCommitRequiredAcks
onElection¶
onElection(
offsetTopicPartitionId: Int,
coordinatorEpoch: Int): Unit
onElection
prints out the following INFO message to the logs:
Elected as the group coordinator for partition [offsetTopicPartitionId] in epoch [coordinatorEpoch]
onElection
requests the GroupMetadataManager to scheduleLoadGroupAndOffsets.
onElection
is used when:
RequestHandlerHelper
is requested toonLeadershipChange
for __consumer_offsetsBrokerMetadataPublisher
is requested topublish
metadata for __consumer_offsets
Starting Up¶
startup(
retrieveGroupMetadataTopicPartitionCount: () => Int,
enableMetadataExpiration: Boolean = true): Unit
startup
prints out the following INFO message to the logs:
Starting up.
startup
requests the GroupMetadataManager to start up.
In the end, startup
prints out the following INFO message to the logs:
Startup complete.
startup
is used when:
KafkaServer
is requested to start upBrokerMetadataPublisher
is requested toinitializeManagers
Logging¶
Enable ALL
logging level for kafka.coordinator.group.GroupCoordinator
logger to see what happens inside.
Add the following line to config/log4j.properties
:
log4j.logger.kafka.coordinator.group.GroupCoordinator=ALL
Refer to Logging.
logIdent¶
GroupCoordinator
uses the following logging prefix (with the broker.id):
[GroupCoordinator [brokerId]]