StreamsPartitionAssignor¶
StreamsPartitionAssignor is a ConsumerPartitionAssignor (Apache Kafka) and a Configurable (Apache Kafka).
Supported Rebalance Protocols¶
List<RebalanceProtocol> supportedProtocols()
supportedProtocols returns the following RebalanceProtocols:
RebalanceProtocol.EAGERRebalanceProtocol.COOPERATIVE(based on upgrade.from)
supportedProtocols is part of the ConsumerPartitionAssignor (Apache Kafka) abstraction.
Name¶
String name()
name is stream.
name is part of the ConsumerPartitionAssignor (Apache Kafka) abstraction.
configure¶
void configure(
Map<String, ?> configs)
configure creates a new AssignorConfiguration (with the given configs).
configure...FIXME
configure is part of the Configurable (Apache Kafka) abstraction.
StreamsMetadataState¶
StreamsPartitionAssignor is given a StreamsMetadataState (from a ReferenceContainer) when requested to configure.
The StreamsMetadataState is used (to handle partition assignment change) when:
Consumer Group Assignment¶
GroupAssignment assign(
Cluster metadata,
GroupSubscription groupSubscription)
assign...FIXME
assign prints out the following DEBUG message to the logs:
Constructed client metadata [clientMetadata] from the member subscriptions.
assign prepareRepartitionTopics with the given cluster metadata (that gives a Map<TopicPartition, PartitionInfo> as allRepartitionTopicPartitions).
assign prints out the following DEBUG message to the logs:
Created repartition topics [allRepartitionTopicPartitions] from the parsed topology.
assign...FIXME
assign is part of the ConsumerPartitionAssignor (Apache Kafka) abstraction.
prepareRepartitionTopics¶
Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(
Cluster metadata)
prepareRepartitionTopics creates a new RepartitionTopics that is requested to setup and then for the topicPartitionsInfo.
assignTasksToClients¶
boolean assignTasksToClients(
Cluster fullMetadata,
Set<String> allSourceTopics,
Map<Subtopology, TopicsInfo> topicGroups,
Map<UUID, ClientMetadata> clientMetadataMap,
Map<TaskId, Set<TopicPartition>> partitionsForTask,
Set<TaskId> statefulTasks)
assignTasksToClients creates local taskForPartition (Map<TopicPartition, TaskId>) and tasksForTopicGroup (Map<Subtopology, Set<TaskId>>) collections that are used to populate tasks.
assignTasksToClients creates a ChangelogTopics (with the tasksForTopicGroup local collection) that is in turn requested to setup.
assignTasksToClients populateClientStatesMap.
assignTasksToClients prints out the following INFO message to the logs:
All members participating in this rebalance:
[UUID]: [consumers].
assignTasksToClients prints out the following DEBUG message to the logs:
Assigning tasks [allTasks] including stateful [statefulTasks] to clients [clientStates] with number of replicas [numStandbyReplicas]
assignTasksToClients creates a TaskAssignor that is in turn requested to assign.
assignTasksToClients prints out the following INFO message to the logs:
Assigned tasks [allTasks] including stateful [statefulTasks] to clients as:
[UUID]=[currentAssignment].
In the end, assignTasksToClients returns whether the generated assignment requires a followup probing rebalance (from the TaskAssignor).
populateTasksForMaps¶
void populateTasksForMaps(
Map<TopicPartition, TaskId> taskForPartition,
Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
Set<String> allSourceTopics,
Map<TaskId, Set<TopicPartition>> partitionsForTask,
Cluster fullMetadata)
populateTasksForMaps...FIXME
populateClientStatesMap¶
boolean populateClientStatesMap(
Map<UUID, ClientState> clientStates,
Map<UUID, ClientMetadata> clientMetadataMap,
Map<TopicPartition, TaskId> taskForPartition,
ChangelogTopics changelogTopics)
populateClientStatesMap...FIXME
createTaskAssignor¶
TaskAssignor createTaskAssignor(
boolean lagComputationSuccessful)
createTaskAssignor creates a TaskAssignor (using the taskAssignorSupplier function).
Handling Task and Partition Assignment¶
void onAssignment(
Assignment assignment,
ConsumerGroupMetadata metadata)
onAssignment is part of the ConsumerPartitionAssignor (Apache Kafka) abstraction.
onAssignment validateActiveTaskEncoding.
onAssignment gets the active tasks (from the partitions and the AssignmentInfo of the given Assignment).
onAssignment maybeScheduleFollowupRebalance.
onAssignment creates an empty (fake) Cluster metadata (with the partitions by host, i.e. Map<HostInfo, Set<TopicPartition>>) and requests the StreamsMetadataState to handle the assignment change.
In the end, onAssignment requests the TaskManager to handle the task and partition assignment (with the active and standby tasks).
validateActiveTaskEncoding¶
void validateActiveTaskEncoding(
List<TopicPartition> partitions,
AssignmentInfo info,
String logPrefix)
validateActiveTaskEncoding throws a TaskAssignmentException when the number of partitions is not the same as the number of active tasks (of the given AssignmentInfo):
[logPrefix]Number of assigned partitions [partitions]
is not equal to the number of active taskIds [activeTasks], assignmentInfo=[info]
Active Tasks¶
Map<TaskId, Set<TopicPartition>> getActiveTasks(
List<TopicPartition> partitions,
AssignmentInfo info)
getActiveTasks returns TaskIds and the associated TopicPartitions (from the partitions).
getActiveTasks finds the TaskIds among the activeTasks in the given AssignmentInfo.
getActiveTasks assumes that the position of the TopicPartition in the given partitions is the position of the corresponding TaskId in the activeTasks in the given AssignmentInfo.
maybeScheduleFollowupRebalance¶
void maybeScheduleFollowupRebalance(
long encodedNextScheduledRebalanceMs,
int receivedAssignmentMetadataVersion,
int latestCommonlySupportedVersion,
Set<HostInfo> groupHostInfo)
maybeScheduleFollowupRebalance...FIXME
subscriptionUserData¶
ByteBuffer subscriptionUserData(
Set<String> topics)
subscriptionUserData...FIXME
subscriptionUserData is part of the ConsumerPartitionAssignor (Apache Kafka) abstraction.
Logging¶
Enable ALL logging level for org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor=ALL
Refer to Logging.