PartitionStateMachine¶
PartitionStateMachine is an abstraction of partition state machines that can handleStateChanges.
Contract¶
handleStateChanges¶
handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] // (1)!
handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
- Uses an undefined
leaderElectionStrategy(None)
Handles state changes of partitions (partition state changes)
Used when:
KafkaControlleris requested to onNewPartitionCreation, onReplicasBecomeOffline, onReplicaElection, moveReassignedPartitionLeaderIfRequired, doControlledShutdownPartitionStateMachineis requested to triggerOnlineStateChangeForPartitionsTopicDeletionManageris requested to onTopicDeletion
Implementations¶
Creating Instance¶
PartitionStateMachine takes the following to be created:
Abstract Class
PartitionStateMachine is an abstract class and cannot be created directly. It is created indirectly for the concrete PartitionStateMachines.
Starting Up (on Active Controller)¶
startup(): Unit
startup prints out the following INFO message to the logs:
Initializing partition state
startup initializePartitionState.
startup prints out the following INFO message to the logs:
Triggering online partition state changes
startup triggerOnlinePartitionStateChange.
In the end, startup prints out the following DEBUG message to the logs:
Started partition state machine with initial state -> [partitionStates]
startup is used when:
KafkaControlleris requested to onControllerFailover (when a broker is successfully elected as the controller)
initializePartitionState¶
initializePartitionState(): Unit
initializePartitionState...FIXME
triggerOnlinePartitionStateChange¶
triggerOnlinePartitionStateChange(): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
triggerOnlinePartitionStateChange(
topic: String): Unit // (1)!
- Uses the partitions of the given topic only
triggerOnlinePartitionStateChange requests the ControllerContext for all the partitions in the following states (possibly limited to the given topic):
NewPartitionOfflinePartition
In the end, triggerOnlinePartitionStateChange triggers online state change for the partitions.
triggerOnlinePartitionStateChange is used when:
KafkaControlleris requested to onBrokerStartup, onReplicasBecomeOffline, processUncleanLeaderElectionEnable, processTopicUncleanLeaderElectionEnablePartitionStateMachineis requested to start up
triggerOnlineStateChangeForPartitions¶
triggerOnlineStateChangeForPartitions(
partitions: Set[TopicPartition]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
triggerOnlineStateChangeForPartitions filters out the partitions of the topics to be deleted from the given partitions.
triggerOnlineStateChangeForPartitions tries to move the partitions to OnlinePartition state with OfflinePartitionLeaderElectionStrategy (with allowUnclean flag off).
Review Me¶
=== [[PartitionLeaderElectionStrategy]] PartitionLeaderElectionStrategy
.PartitionLeaderElectionStrategies [cols="30m,70",options="header",width="100%"] |=== | Name | Description
| ControlledShutdownPartitionLeaderElectionStrategy a| [[ControlledShutdownPartitionLeaderElectionStrategy]]
| OfflinePartitionLeaderElectionStrategy a| [[OfflinePartitionLeaderElectionStrategy]] Accepts allowUnclean flag
Handled by ZkPartitionStateMachine when requested to link:kafka-controller-ZkPartitionStateMachine.adoc#doElectLeaderForPartitions[doElectLeaderForPartitions]
Used when:
-
KafkaControlleris requested to link:kafka-controller-KafkaController.adoc#onNewPartitionCreation[onNewPartitionCreation] (with theallowUncleanflag off), link:kafka-controller-KafkaController.adoc#onReplicaElection[onReplicaElection] (with theallowUncleanflag on for the admin client) -
PartitionStateMachineis requested to <> (with the allowUncleanflag off)
| PreferredReplicaPartitionLeaderElectionStrategy a| [[PreferredReplicaPartitionLeaderElectionStrategy]] KafkaController is requested for <ZkPartitionStateMachine to <
| ReassignPartitionLeaderElectionStrategy a| [[ReassignPartitionLeaderElectionStrategy]]
|===
=== [[shutdown]] Shutting Down -- shutdown Method
[source, scala]¶
shutdown(): Unit¶
shutdown simply prints out the following INFO message to the logs:
Stopped partition state machine
NOTE: shutdown is used exclusively when is requested to <
=== [[initializePartitionState]] initializePartitionState Internal Method
[source, scala]¶
initializePartitionState(): Unit¶
initializePartitionState requests the <
For every TopicPartition, initializePartitionState requests the <LeaderIsrAndControllerEpoch metadata (using the <
initializePartitionState <TopicPartition as follows:
-
OnlinePartitionwhen the <> says that the < > (for the leader ISR and the TopicPartition) -
OfflinePartitionwhen the <> says that the < > (for the leader ISR and the TopicPartition) -
NewPartitionwhen the <> has no metadata about the TopicPartition
NOTE: initializePartitionState is used exclusively when PartitionStateMachine is requested to <