Skip to content


Review Me

ControllerContext is the context of an active <> (and is <> right when KafkaController is <>).

[[creating-instance]] ControllerContext takes no input arguments to be created.

=== [[allPartitions]] allPartitions Method

[source, scala]

allPartitions: Set[TopicPartition]

allPartitions converts the <> into TopicPartitions, i.e. allPartitions takes the partitions for the topics and simply creates new TopicPartitions.


allPartitions is used when:

  • KafkaController is requested to <>, <>, and <>

  • PartitionStateMachine is requested to <>

* ReplicaStateMachine is requested to <>

=== [[updatePartitionReplicaAssignment]] updatePartitionReplicaAssignment Method

[source, scala]

updatePartitionReplicaAssignment(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit

updatePartitionReplicaAssignment simply updates the <> registry with newReplicas for the topic and the partition (of a given TopicPartition).


updatePartitionReplicaAssignment is used when:

  • KafkaController is requested to <>, <>, <>, and at <> and <> controller events

* ReplicaStateMachine is requested to <>

=== [[partitionReplicaAssignment]] partitionReplicaAssignment Method

[source, scala]

partitionReplicaAssignment( topicPartition: TopicPartition): Seq[Int]

partitionReplicaAssignment finds the brokers with the replicas of the given partition (aka partition replica assignment).

Internally, partitionReplicaAssignment finds broker IDs of the replicas of the given partition (TopicPartition) in the <> internal registry.

partitionReplicaAssignment returns an empty collection when no topic or partition are found.

NOTE: partitionReplicaAssignment is used when...FIXME

=== [[putReplicaStateIfNotExists]] putReplicaStateIfNotExists Method

[source, scala]

putReplicaStateIfNotExists( replica: PartitionAndReplica, state: ReplicaState): Unit

putReplicaStateIfNotExists simply adds the replica to the <> internal registry unless available already.

NOTE: putReplicaStateIfNotExists is used exclusively when ZkReplicaStateMachine is requested to <>.

=== [[checkValidReplicaStateChange]] checkValidReplicaStateChange Method

[source, scala]

checkValidReplicaStateChange( replicas: Seq[PartitionAndReplica], targetState: ReplicaState ): (Seq[PartitionAndReplica], Seq[PartitionAndReplica])

For every replica (in the given replicas), checkValidReplicaStateChange <> with the target state (ReplicaState).

NOTE: checkValidReplicaStateChange is used exclusively when ZkReplicaStateMachine is requested to <>.

=== [[checkValidPartitionStateChange]] checkValidPartitionStateChange Method

[source, scala]

checkValidPartitionStateChange( partitions: Seq[TopicPartition], targetState: PartitionState ): (Seq[TopicPartition], Seq[TopicPartition])

For every replica (in the given replicas), checkValidPartitionStateChange <> with the target state (PartitionState).

NOTE: checkValidPartitionStateChange is used exclusively when ZkReplicaStateMachine is requested to <>.

=== [[partitionsInStates]] Finding Partitions by Given States -- partitionsInStates Method

[source, scala]

partitionsInStates( states: Set[PartitionState]): Set[TopicPartition] partitionsInStates( topic: String, states: Set[PartitionState]): Set[TopicPartition]

partitionsInStates uses the <> internal registry to find all of the TopicPartitions (of the topic if defined) in the given PartitionStates.

NOTE: partitionsInStates is used when PartitionStateMachine is requested to[triggerOnlinePartitionStateChange].


ControllerStats with UncleanLeaderElectionsPerSec meter metric and KafkaTimers for every ControllerState (except Idle state)

stats is used exclusively to create the <> (of <>) that is then used to collect the times (metrics) of <> (except <>)

  • Every ControllerState has the <> metric defined (except <> state)

The timer metric name pattern is kafka.controller:type=ControllerStats,name=.