PartitionStateMachine¶
PartitionStateMachine
is an abstraction of partition state machines that can handleStateChanges.
Contract¶
handleStateChanges¶
handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] // (1)!
handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
- Uses an undefined
leaderElectionStrategy
(None
)
Handles state changes of partitions (partition state changes)
Used when:
KafkaController
is requested to onNewPartitionCreation, onReplicasBecomeOffline, onReplicaElection, moveReassignedPartitionLeaderIfRequired, doControlledShutdownPartitionStateMachine
is requested to triggerOnlineStateChangeForPartitionsTopicDeletionManager
is requested to onTopicDeletion
Implementations¶
Creating Instance¶
PartitionStateMachine
takes the following to be created:
Abstract Class
PartitionStateMachine
is an abstract class and cannot be created directly. It is created indirectly for the concrete PartitionStateMachines.
Starting Up (on Active Controller)¶
startup(): Unit
startup
prints out the following INFO message to the logs:
Initializing partition state
startup
initializePartitionState.
startup
prints out the following INFO message to the logs:
Triggering online partition state changes
startup
triggerOnlinePartitionStateChange.
In the end, startup
prints out the following DEBUG message to the logs:
Started partition state machine with initial state -> [partitionStates]
startup
is used when:
KafkaController
is requested to onControllerFailover (when a broker is successfully elected as the controller)
initializePartitionState¶
initializePartitionState(): Unit
initializePartitionState
...FIXME
triggerOnlinePartitionStateChange¶
triggerOnlinePartitionStateChange(): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
triggerOnlinePartitionStateChange(
topic: String): Unit // (1)!
- Uses the partitions of the given topic only
triggerOnlinePartitionStateChange
requests the ControllerContext for all the partitions in the following states (possibly limited to the given topic):
NewPartition
OfflinePartition
In the end, triggerOnlinePartitionStateChange
triggers online state change for the partitions.
triggerOnlinePartitionStateChange
is used when:
KafkaController
is requested to onBrokerStartup, onReplicasBecomeOffline, processUncleanLeaderElectionEnable, processTopicUncleanLeaderElectionEnablePartitionStateMachine
is requested to start up
triggerOnlineStateChangeForPartitions¶
triggerOnlineStateChangeForPartitions(
partitions: Set[TopicPartition]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
triggerOnlineStateChangeForPartitions
filters out the partitions of the topics to be deleted from the given partitions
.
triggerOnlineStateChangeForPartitions
tries to move the partitions to OnlinePartition
state with OfflinePartitionLeaderElectionStrategy (with allowUnclean
flag off).
Review Me¶
=== [[PartitionLeaderElectionStrategy]] PartitionLeaderElectionStrategy
.PartitionLeaderElectionStrategies [cols="30m,70",options="header",width="100%"] |=== | Name | Description
| ControlledShutdownPartitionLeaderElectionStrategy a| [[ControlledShutdownPartitionLeaderElectionStrategy]]
| OfflinePartitionLeaderElectionStrategy a| [[OfflinePartitionLeaderElectionStrategy]] Accepts allowUnclean
flag
Handled by ZkPartitionStateMachine
when requested to link:kafka-controller-ZkPartitionStateMachine.adoc#doElectLeaderForPartitions[doElectLeaderForPartitions]
Used when:
-
KafkaController
is requested to link:kafka-controller-KafkaController.adoc#onNewPartitionCreation[onNewPartitionCreation] (with theallowUnclean
flag off), link:kafka-controller-KafkaController.adoc#onReplicaElection[onReplicaElection] (with theallowUnclean
flag on for the admin client) -
PartitionStateMachine
is requested to <> (with the allowUnclean
flag off)
| PreferredReplicaPartitionLeaderElectionStrategy a| [[PreferredReplicaPartitionLeaderElectionStrategy]] KafkaController
is requested for <ZkPartitionStateMachine
to <
| ReassignPartitionLeaderElectionStrategy a| [[ReassignPartitionLeaderElectionStrategy]]
|===
=== [[shutdown]] Shutting Down -- shutdown
Method
[source, scala]¶
shutdown(): Unit¶
shutdown
simply prints out the following INFO message to the logs:
Stopped partition state machine
NOTE: shutdown
is used exclusively when is requested to <
=== [[initializePartitionState]] initializePartitionState
Internal Method
[source, scala]¶
initializePartitionState(): Unit¶
initializePartitionState
requests the <
For every TopicPartition
, initializePartitionState
requests the <LeaderIsrAndControllerEpoch
metadata (using the <
initializePartitionState
<TopicPartition
as follows:
-
OnlinePartition
when the <> says that the < > (for the leader ISR and the TopicPartition
) -
OfflinePartition
when the <> says that the < > (for the leader ISR and the TopicPartition
) -
NewPartition
when the <> has no metadata about the TopicPartition
NOTE: initializePartitionState
is used exclusively when PartitionStateMachine
is requested to <