StreamsPartitionAssignor¶
StreamsPartitionAssignor
is a ConsumerPartitionAssignor
(Apache Kafka) and a Configurable
(Apache Kafka).
Supported Rebalance Protocols¶
List<RebalanceProtocol> supportedProtocols()
supportedProtocols
returns the following RebalanceProtocol
s:
RebalanceProtocol.EAGER
RebalanceProtocol.COOPERATIVE
(based on upgrade.from)
supportedProtocols
is part of the ConsumerPartitionAssignor
(Apache Kafka) abstraction.
Name¶
String name()
name
is stream.
name
is part of the ConsumerPartitionAssignor
(Apache Kafka) abstraction.
configure¶
void configure(
Map<String, ?> configs)
configure
creates a new AssignorConfiguration (with the given configs
).
configure
...FIXME
configure
is part of the Configurable
(Apache Kafka) abstraction.
StreamsMetadataState¶
StreamsPartitionAssignor
is given a StreamsMetadataState (from a ReferenceContainer) when requested to configure.
The StreamsMetadataState
is used (to handle partition assignment change) when:
Consumer Group Assignment¶
GroupAssignment assign(
Cluster metadata,
GroupSubscription groupSubscription)
assign
...FIXME
assign
prints out the following DEBUG message to the logs:
Constructed client metadata [clientMetadata] from the member subscriptions.
assign
prepareRepartitionTopics with the given cluster metadata (that gives a Map<TopicPartition, PartitionInfo>
as allRepartitionTopicPartitions
).
assign
prints out the following DEBUG message to the logs:
Created repartition topics [allRepartitionTopicPartitions] from the parsed topology.
assign
...FIXME
assign
is part of the ConsumerPartitionAssignor
(Apache Kafka) abstraction.
prepareRepartitionTopics¶
Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(
Cluster metadata)
prepareRepartitionTopics
creates a new RepartitionTopics that is requested to setup and then for the topicPartitionsInfo.
assignTasksToClients¶
boolean assignTasksToClients(
Cluster fullMetadata,
Set<String> allSourceTopics,
Map<Subtopology, TopicsInfo> topicGroups,
Map<UUID, ClientMetadata> clientMetadataMap,
Map<TaskId, Set<TopicPartition>> partitionsForTask,
Set<TaskId> statefulTasks)
assignTasksToClients
creates local taskForPartition
(Map<TopicPartition, TaskId>
) and tasksForTopicGroup
(Map<Subtopology, Set<TaskId>>
) collections that are used to populate tasks.
assignTasksToClients
creates a ChangelogTopics (with the tasksForTopicGroup
local collection) that is in turn requested to setup.
assignTasksToClients
populateClientStatesMap.
assignTasksToClients
prints out the following INFO message to the logs:
All members participating in this rebalance:
[UUID]: [consumers].
assignTasksToClients
prints out the following DEBUG message to the logs:
Assigning tasks [allTasks] including stateful [statefulTasks] to clients [clientStates] with number of replicas [numStandbyReplicas]
assignTasksToClients
creates a TaskAssignor that is in turn requested to assign.
assignTasksToClients
prints out the following INFO message to the logs:
Assigned tasks [allTasks] including stateful [statefulTasks] to clients as:
[UUID]=[currentAssignment].
In the end, assignTasksToClients
returns whether the generated assignment requires a followup probing rebalance (from the TaskAssignor).
populateTasksForMaps¶
void populateTasksForMaps(
Map<TopicPartition, TaskId> taskForPartition,
Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
Set<String> allSourceTopics,
Map<TaskId, Set<TopicPartition>> partitionsForTask,
Cluster fullMetadata)
populateTasksForMaps
...FIXME
populateClientStatesMap¶
boolean populateClientStatesMap(
Map<UUID, ClientState> clientStates,
Map<UUID, ClientMetadata> clientMetadataMap,
Map<TopicPartition, TaskId> taskForPartition,
ChangelogTopics changelogTopics)
populateClientStatesMap
...FIXME
createTaskAssignor¶
TaskAssignor createTaskAssignor(
boolean lagComputationSuccessful)
createTaskAssignor
creates a TaskAssignor (using the taskAssignorSupplier function).
Handling Task and Partition Assignment¶
void onAssignment(
Assignment assignment,
ConsumerGroupMetadata metadata)
onAssignment
is part of the ConsumerPartitionAssignor
(Apache Kafka) abstraction.
onAssignment
validateActiveTaskEncoding.
onAssignment
gets the active tasks (from the partitions and the AssignmentInfo
of the given Assignment
).
onAssignment
maybeScheduleFollowupRebalance.
onAssignment
creates an empty (fake) Cluster
metadata (with the partitions by host, i.e. Map<HostInfo, Set<TopicPartition>>
) and requests the StreamsMetadataState to handle the assignment change.
In the end, onAssignment
requests the TaskManager to handle the task and partition assignment (with the active and standby tasks).
validateActiveTaskEncoding¶
void validateActiveTaskEncoding(
List<TopicPartition> partitions,
AssignmentInfo info,
String logPrefix)
validateActiveTaskEncoding
throws a TaskAssignmentException
when the number of partitions
is not the same as the number of active tasks (of the given AssignmentInfo
):
[logPrefix]Number of assigned partitions [partitions]
is not equal to the number of active taskIds [activeTasks], assignmentInfo=[info]
Active Tasks¶
Map<TaskId, Set<TopicPartition>> getActiveTasks(
List<TopicPartition> partitions,
AssignmentInfo info)
getActiveTasks
returns TaskIds and the associated TopicPartition
s (from the partitions
).
getActiveTasks
finds the TaskIds among the activeTasks
in the given AssignmentInfo
.
getActiveTasks
assumes that the position of the TopicPartition
in the given partitions
is the position of the corresponding TaskId
in the activeTasks
in the given AssignmentInfo
.
maybeScheduleFollowupRebalance¶
void maybeScheduleFollowupRebalance(
long encodedNextScheduledRebalanceMs,
int receivedAssignmentMetadataVersion,
int latestCommonlySupportedVersion,
Set<HostInfo> groupHostInfo)
maybeScheduleFollowupRebalance
...FIXME
subscriptionUserData¶
ByteBuffer subscriptionUserData(
Set<String> topics)
subscriptionUserData
...FIXME
subscriptionUserData
is part of the ConsumerPartitionAssignor
(Apache Kafka) abstraction.
Logging¶
Enable ALL
logging level for org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor=ALL
Refer to Logging.