Skip to content

StreamsPartitionAssignor

StreamsPartitionAssignor is a ConsumerPartitionAssignor (Apache Kafka) and a Configurable (Apache Kafka).

Supported Rebalance Protocols

List<RebalanceProtocol> supportedProtocols()

supportedProtocols returns the following RebalanceProtocols:

  1. RebalanceProtocol.EAGER
  2. RebalanceProtocol.COOPERATIVE (based on upgrade.from)

supportedProtocols is part of the ConsumerPartitionAssignor (Apache Kafka) abstraction.

Name

String name()

name is stream.

name is part of the ConsumerPartitionAssignor (Apache Kafka) abstraction.

configure

void configure(
  Map<String, ?> configs)

configure creates a new AssignorConfiguration (with the given configs).

configure...FIXME

configure is part of the Configurable (Apache Kafka) abstraction.

StreamsMetadataState

StreamsPartitionAssignor is given a StreamsMetadataState (from a ReferenceContainer) when requested to configure.

The StreamsMetadataState is used (to handle partition assignment change) when:

Consumer Group Assignment

GroupAssignment assign(
  Cluster metadata,
  GroupSubscription groupSubscription)

assign...FIXME

assign prints out the following DEBUG message to the logs:

Constructed client metadata [clientMetadata] from the member subscriptions.

assign prepareRepartitionTopics with the given cluster metadata (that gives a Map<TopicPartition, PartitionInfo> as allRepartitionTopicPartitions).

assign prints out the following DEBUG message to the logs:

Created repartition topics [allRepartitionTopicPartitions] from the parsed topology.

assign...FIXME

assign is part of the ConsumerPartitionAssignor (Apache Kafka) abstraction.

prepareRepartitionTopics

Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(
  Cluster metadata)

prepareRepartitionTopics creates a new RepartitionTopics that is requested to setup and then for the topicPartitionsInfo.

assignTasksToClients

boolean assignTasksToClients(
  Cluster fullMetadata,
  Set<String> allSourceTopics,
  Map<Subtopology, TopicsInfo> topicGroups,
  Map<UUID, ClientMetadata> clientMetadataMap,
  Map<TaskId, Set<TopicPartition>> partitionsForTask,
  Set<TaskId> statefulTasks)

assignTasksToClients creates local taskForPartition (Map<TopicPartition, TaskId>) and tasksForTopicGroup (Map<Subtopology, Set<TaskId>>) collections that are used to populate tasks.

assignTasksToClients creates a ChangelogTopics (with the tasksForTopicGroup local collection) that is in turn requested to setup.

assignTasksToClients populateClientStatesMap.

assignTasksToClients prints out the following INFO message to the logs:

All members participating in this rebalance:
 [UUID]: [consumers].

assignTasksToClients prints out the following DEBUG message to the logs:

Assigning tasks [allTasks] including stateful [statefulTasks] to clients [clientStates] with number of replicas [numStandbyReplicas]

assignTasksToClients creates a TaskAssignor that is in turn requested to assign.

assignTasksToClients prints out the following INFO message to the logs:

Assigned tasks [allTasks] including stateful [statefulTasks] to clients as:
 [UUID]=[currentAssignment].

In the end, assignTasksToClients returns whether the generated assignment requires a followup probing rebalance (from the TaskAssignor).

populateTasksForMaps

void populateTasksForMaps(
  Map<TopicPartition, TaskId> taskForPartition,
  Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
  Set<String> allSourceTopics,
  Map<TaskId, Set<TopicPartition>> partitionsForTask,
  Cluster fullMetadata)

populateTasksForMaps...FIXME

populateClientStatesMap

boolean populateClientStatesMap(
  Map<UUID, ClientState> clientStates,
  Map<UUID, ClientMetadata> clientMetadataMap,
  Map<TopicPartition, TaskId> taskForPartition,
  ChangelogTopics changelogTopics)

populateClientStatesMap...FIXME

createTaskAssignor

TaskAssignor createTaskAssignor(
  boolean lagComputationSuccessful)

createTaskAssignor creates a TaskAssignor (using the taskAssignorSupplier function).

Handling Task and Partition Assignment

void onAssignment(
  Assignment assignment,
  ConsumerGroupMetadata metadata)

onAssignment is part of the ConsumerPartitionAssignor (Apache Kafka) abstraction.


onAssignment validateActiveTaskEncoding.

onAssignment gets the active tasks (from the partitions and the AssignmentInfo of the given Assignment).

onAssignment maybeScheduleFollowupRebalance.

onAssignment creates an empty (fake) Cluster metadata (with the partitions by host, i.e. Map<HostInfo, Set<TopicPartition>>) and requests the StreamsMetadataState to handle the assignment change.

In the end, onAssignment requests the TaskManager to handle the task and partition assignment (with the active and standby tasks).

validateActiveTaskEncoding

void validateActiveTaskEncoding(
  List<TopicPartition> partitions,
  AssignmentInfo info,
  String logPrefix)

validateActiveTaskEncoding throws a TaskAssignmentException when the number of partitions is not the same as the number of active tasks (of the given AssignmentInfo):

[logPrefix]Number of assigned partitions [partitions]
is not equal to the number of active taskIds [activeTasks], assignmentInfo=[info]

Active Tasks

Map<TaskId, Set<TopicPartition>> getActiveTasks(
  List<TopicPartition> partitions,
  AssignmentInfo info)

getActiveTasks returns TaskIds and the associated TopicPartitions (from the partitions).

getActiveTasks finds the TaskIds among the activeTasks in the given AssignmentInfo.

getActiveTasks assumes that the position of the TopicPartition in the given partitions is the position of the corresponding TaskId in the activeTasks in the given AssignmentInfo.

maybeScheduleFollowupRebalance

void maybeScheduleFollowupRebalance(
  long encodedNextScheduledRebalanceMs,
  int receivedAssignmentMetadataVersion,
  int latestCommonlySupportedVersion,
  Set<HostInfo> groupHostInfo)

maybeScheduleFollowupRebalance...FIXME

subscriptionUserData

ByteBuffer subscriptionUserData(
  Set<String> topics)

subscriptionUserData...FIXME

subscriptionUserData is part of the ConsumerPartitionAssignor (Apache Kafka) abstraction.

Logging

Enable ALL logging level for org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor=ALL

Refer to Logging.

Back to top