AbstractCoordinator¶
AbstractCoordinator
is an abstraction of consumer group coordination manager.
Contract¶
metadata¶
JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata()
Used when:
AbstractCoordinator
is requested to ensureActiveGroup (and sendJoinGroupRequest)
onJoinComplete¶
void onJoinComplete(
int generation,
String memberId,
String protocol,
ByteBuffer memberAssignment)
Used when:
AbstractCoordinator
is requested to ensureActiveGroup (and joinGroupIfNeeded)
onJoinPrepare¶
void onJoinPrepare(
int generation,
String memberId)
Used when:
AbstractCoordinator
is requested to ensureActiveGroup (and joinGroupIfNeeded)
performAssignment¶
Map<String, ByteBuffer> performAssignment(
String leaderId,
String protocol,
List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata)
Used when:
AbstractCoordinator
is requested to handle a JoinGroup response (having joined the group as the leader)
protocolType¶
String protocolType()
Used when:
AbstractCoordinator
is requested to sendJoinGroupRequest, onJoinFollower, onJoinLeader, isProtocolTypeInconsistent
Implementations¶
- ConsumerCoordinator
WorkerCoordinator
(Kafka Connect)
Handling JoinGroup Response¶
Group Leader¶
RequestFuture<ByteBuffer> onJoinLeader(
JoinGroupResponse joinResponse)
onJoinLeader
performAssignment (with the leader ID, the group protocol and the members).
onJoinLeader
prints out the following DEBUG message to the logs:
Sending leader SyncGroup to coordinator [coordinator] at generation [generation]: [SyncGroupRequest]
In the end, onJoinLeader
sends a SyncGroupRequest
to the coordinator.
onJoinLeader
is used when:
- JoinGroupResponseHandler is requested to handle a
JoinGroup
response (after joining group successfully as the leader)
Group Follower¶
RequestFuture<ByteBuffer> onJoinFollower()
onJoinFollower
...FIXME
onJoinFollower
is used when:
- JoinGroupResponseHandler is requested to handle a
JoinGroup
response (after joining group successfully as a follower)
ensureActiveGroup¶
void ensureActiveGroup()
boolean ensureActiveGroup(
Timer timer)
ensureActiveGroup
ensureCoordinatorReady (and returns false
if not).
ensureActiveGroup
...FIXME
ensureActiveGroup
is used when:
ConsumerCoordinator
is requested to pollWorkerCoordinator
(Kafka Connect) is requested topoll
joinGroupIfNeeded¶
boolean joinGroupIfNeeded(
Timer timer)
joinGroupIfNeeded
...FIXME
initiateJoinGroup¶
RequestFuture<ByteBuffer> initiateJoinGroup()
initiateJoinGroup
...FIXME
sendJoinGroupRequest¶
RequestFuture<ByteBuffer> sendJoinGroupRequest()
sendJoinGroupRequest
...FIXME
Waiting for Consumer Group Coordinator Known and Ready¶
boolean ensureCoordinatorReady(
Timer timer)
ensureCoordinatorReady
returns true
immediately when the consumer group coordinator is known and available.
Otherwise, ensureCoordinatorReady
keeps looking up the group coordinator (by sending FindCoordinator
requests to the least loaded broker) until the coordinator is available or the timer timed out.
In the end, ensureCoordinatorReady
returns whether the coordinator is known and available or not.
ensureCoordinatorReady
is used when:
AbstractCoordinator
is requested to ensureActiveGroup (and joinGroupIfNeeded)ConsumerCoordinator
is requested to poll, fetchCommittedOffsets, close, commitOffsetsSync
JoinGroupResponseHandler¶
JoinGroupResponseHandler
is a CoordinatorResponseHandler
to handle responses from the group coordinator after sendJoinGroupRequest.