AbstractCoordinator¶
AbstractCoordinator is an abstraction of consumer group coordination manager.
Contract¶
metadata¶
JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata()
Used when:
AbstractCoordinatoris requested to ensureActiveGroup (and sendJoinGroupRequest)
onJoinComplete¶
void onJoinComplete(
int generation,
String memberId,
String protocol,
ByteBuffer memberAssignment)
Used when:
AbstractCoordinatoris requested to ensureActiveGroup (and joinGroupIfNeeded)
onJoinPrepare¶
void onJoinPrepare(
int generation,
String memberId)
Used when:
AbstractCoordinatoris requested to ensureActiveGroup (and joinGroupIfNeeded)
performAssignment¶
Map<String, ByteBuffer> performAssignment(
String leaderId,
String protocol,
List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata)
Used when:
AbstractCoordinatoris requested to handle a JoinGroup response (having joined the group as the leader)
protocolType¶
String protocolType()
Used when:
AbstractCoordinatoris requested to sendJoinGroupRequest, onJoinFollower, onJoinLeader, isProtocolTypeInconsistent
Implementations¶
- ConsumerCoordinator
WorkerCoordinator(Kafka Connect)
Handling JoinGroup Response¶
Group Leader¶
RequestFuture<ByteBuffer> onJoinLeader(
JoinGroupResponse joinResponse)
onJoinLeader performAssignment (with the leader ID, the group protocol and the members).
onJoinLeader prints out the following DEBUG message to the logs:
Sending leader SyncGroup to coordinator [coordinator] at generation [generation]: [SyncGroupRequest]
In the end, onJoinLeader sends a SyncGroupRequest to the coordinator.
onJoinLeader is used when:
- JoinGroupResponseHandler is requested to handle a
JoinGroupresponse (after joining group successfully as the leader)
Group Follower¶
RequestFuture<ByteBuffer> onJoinFollower()
onJoinFollower...FIXME
onJoinFollower is used when:
- JoinGroupResponseHandler is requested to handle a
JoinGroupresponse (after joining group successfully as a follower)
ensureActiveGroup¶
void ensureActiveGroup()
boolean ensureActiveGroup(
Timer timer)
ensureActiveGroup ensureCoordinatorReady (and returns false if not).
ensureActiveGroup...FIXME
ensureActiveGroup is used when:
ConsumerCoordinatoris requested to pollWorkerCoordinator(Kafka Connect) is requested topoll
joinGroupIfNeeded¶
boolean joinGroupIfNeeded(
Timer timer)
joinGroupIfNeeded...FIXME
initiateJoinGroup¶
RequestFuture<ByteBuffer> initiateJoinGroup()
initiateJoinGroup...FIXME
sendJoinGroupRequest¶
RequestFuture<ByteBuffer> sendJoinGroupRequest()
sendJoinGroupRequest...FIXME
Waiting for Consumer Group Coordinator Known and Ready¶
boolean ensureCoordinatorReady(
Timer timer)
ensureCoordinatorReady returns true immediately when the consumer group coordinator is known and available.
Otherwise, ensureCoordinatorReady keeps looking up the group coordinator (by sending FindCoordinator requests to the least loaded broker) until the coordinator is available or the timer timed out.
In the end, ensureCoordinatorReady returns whether the coordinator is known and available or not.
ensureCoordinatorReady is used when:
AbstractCoordinatoris requested to ensureActiveGroup (and joinGroupIfNeeded)ConsumerCoordinatoris requested to poll, fetchCommittedOffsets, close, commitOffsetsSync
JoinGroupResponseHandler¶
JoinGroupResponseHandler is a CoordinatorResponseHandler to handle responses from the group coordinator after sendJoinGroupRequest.