Skip to content

PartitionStateMachine

PartitionStateMachine is an abstraction of partition state machines that can handleStateChanges.

Contract

handleStateChanges

handleStateChanges(
  partitions: Seq[TopicPartition],
  targetState: PartitionState
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] // (1)!
handleStateChanges(
  partitions: Seq[TopicPartition],
  targetState: PartitionState,
  leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
  1. Uses an undefined leaderElectionStrategy (None)

Handles state changes of partitions (partition state changes)

Used when:

Implementations

Creating Instance

PartitionStateMachine takes the following to be created:

Abstract Class

PartitionStateMachine is an abstract class and cannot be created directly. It is created indirectly for the concrete PartitionStateMachines.

Starting Up (on Active Controller)

startup(): Unit

startup prints out the following INFO message to the logs:

Initializing partition state

startup initializePartitionState.

startup prints out the following INFO message to the logs:

Triggering online partition state changes

startup triggerOnlinePartitionStateChange.

In the end, startup prints out the following DEBUG message to the logs:

Started partition state machine with initial state -> [partitionStates]

startup is used when:

  • KafkaController is requested to onControllerFailover (when a broker is successfully elected as the controller)

initializePartitionState

initializePartitionState(): Unit

initializePartitionState...FIXME

triggerOnlinePartitionStateChange

triggerOnlinePartitionStateChange(): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
triggerOnlinePartitionStateChange(
  topic: String): Unit // (1)!
  1. Uses the partitions of the given topic only

triggerOnlinePartitionStateChange requests the ControllerContext for all the partitions in the following states (possibly limited to the given topic):

  • NewPartition
  • OfflinePartition

In the end, triggerOnlinePartitionStateChange triggers online state change for the partitions.


triggerOnlinePartitionStateChange is used when:

triggerOnlineStateChangeForPartitions

triggerOnlineStateChangeForPartitions(
  partitions: Set[TopicPartition]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]

triggerOnlineStateChangeForPartitions filters out the partitions of the topics to be deleted from the given partitions.

triggerOnlineStateChangeForPartitions tries to move the partitions to OnlinePartition state with OfflinePartitionLeaderElectionStrategy (with allowUnclean flag off).

Review Me

=== [[PartitionLeaderElectionStrategy]] PartitionLeaderElectionStrategy

.PartitionLeaderElectionStrategies [cols="30m,70",options="header",width="100%"] |=== | Name | Description

| ControlledShutdownPartitionLeaderElectionStrategy a| [[ControlledShutdownPartitionLeaderElectionStrategy]]

| OfflinePartitionLeaderElectionStrategy a| [[OfflinePartitionLeaderElectionStrategy]] Accepts allowUnclean flag

Handled by ZkPartitionStateMachine when requested to link:kafka-controller-ZkPartitionStateMachine.adoc#doElectLeaderForPartitions[doElectLeaderForPartitions]

Used when:

  • KafkaController is requested to link:kafka-controller-KafkaController.adoc#onNewPartitionCreation[onNewPartitionCreation] (with the allowUnclean flag off), link:kafka-controller-KafkaController.adoc#onReplicaElection[onReplicaElection] (with the allowUnclean flag on for the admin client)

  • PartitionStateMachine is requested to <> (with the allowUnclean flag off)

| PreferredReplicaPartitionLeaderElectionStrategy a| [[PreferredReplicaPartitionLeaderElectionStrategy]] KafkaController is requested for <> that in turn triggers ZkPartitionStateMachine to <>

| ReassignPartitionLeaderElectionStrategy a| [[ReassignPartitionLeaderElectionStrategy]]

|===

=== [[shutdown]] Shutting Down -- shutdown Method

[source, scala]

shutdown(): Unit

shutdown simply prints out the following INFO message to the logs:

Stopped partition state machine

NOTE: shutdown is used exclusively when is requested to <>

=== [[initializePartitionState]] initializePartitionState Internal Method

[source, scala]

initializePartitionState(): Unit

initializePartitionState requests the <> for <> (across all the brokers in the Kafka cluster).

For every TopicPartition, initializePartitionState requests the <> for the LeaderIsrAndControllerEpoch metadata (using the <> internal registry).

initializePartitionState <> of a TopicPartition as follows:

  • OnlinePartition when the <> says that the <> (for the leader ISR and the TopicPartition)

  • OfflinePartition when the <> says that the <> (for the leader ISR and the TopicPartition)

  • NewPartition when the <> has no metadata about the TopicPartition

NOTE: initializePartitionState is used exclusively when PartitionStateMachine is requested to <>.