ZkPartitionStateMachine¶
ZkPartitionStateMachine is a PartitionStateMachine of a KafkaController.

When requested to handle partition state changes, ZkPartitionStateMachine uses the ControllerBrokerRequestBatch to propagate them to all brokers in a cluster.
Creating Instance¶
ZkPartitionStateMachine takes the following to be created:
- KafkaConfig
-
StateChangeLogger - ControllerContext
- KafkaZkClient
- ControllerBrokerRequestBatch
ZkPartitionStateMachine is created along with a KafkaController.
Handling State Changes of Partitions¶
handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
handleStateChanges is part of the PartitionStateMachine abstraction.
handleStateChanges does nothing and returns an empty collection when executed with no partitions.
handleStateChanges requests the ControllerBrokerRequestBatch to prepare a new batch.
handleStateChanges doHandleStateChanges (that may give some errors that are returned in the end).
In the end, handleStateChanges requests the ControllerBrokerRequestBatch to send controller requests to brokers.
doHandleStateChanges¶
doHandleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
doHandleStateChanges requests the ControllerContext to putPartitionStateIfNotExists to NonExistentPartition for every partition (in partitions).
doHandleStateChanges requests the ControllerContext to checkValidPartitionStateChange with the given target PartitionState (that splits the partitions into valid and invalid partitions).
doHandleStateChanges logInvalidTransition for every invalid partition.
doHandleStateChanges branches off per the target state:
NewPartition¶
For NewPartition target state, doHandleStateChanges goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the <NewPartition state.
Changed partition [partition] state from [state] to NewPartition with assigned replicas [partitionReplicaAssignment]
OnlinePartition¶
doHandleStateChanges finds uninitialized partitions (among the valid partitions with NewPartition state).
doHandleStateChanges finds partitions to elect a leader (among the valid partitions with OfflinePartition or OnlinePartition state).
For uninitialized partitions, doHandleStateChanges initializeLeaderAndIsrForPartitions, prints out the following INFO message to the logs and requests the ControllerContext to putPartitionState to OnlinePartition state.
Changed partition [partition] from [state] to OnlinePartition with state [leaderAndIsr]
For partitions to elect a leader, doHandleStateChanges electLeaderForPartitions with the input PartitionLeaderElectionStrategy.
For every partition with leader election successful, doHandleStateChanges prints out the following INFO message to the logs and requests the ControllerContext to putPartitionState to OnlinePartition state.
Changed partition [partition] from [state] to OnlinePartition with state [leaderAndIsr]
In the end, doHandleStateChanges returns the partitions with election failed.
OfflinePartition or NonExistentPartition¶
For OfflinePartition target state, doHandleStateChanges goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the <OfflinePartition state.
Changed partition [partition] state from [state] to OfflinePartition
For NonExistentPartition target state, doHandleStateChanges goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the <NonExistentPartition state.
Changed partition [partition] state from [state] to NonExistentPartition
electLeaderForPartitions¶
electLeaderForPartitions(
partitions: Seq[TopicPartition],
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
electLeaderForPartitions doElectLeaderForPartitions until all the given partitions have partition leaders elected successfully or not.
For any partition to retry a leader election, electLeaderForPartitions prints out the following INFO message to the logs:
Retrying leader election with strategy [partitionLeaderElectionStrategy] for partitions [remaining]
doElectLeaderForPartitions¶
doElectLeaderForPartitions(
partitions: Seq[TopicPartition],
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition])
doElectLeaderForPartitions requests the KafkaZkClient for the partition states (with LeaderAndIsr information).
For every partition state response, doElectLeaderForPartitions decodes the response (if possible) and adds it to validLeaderAndIsrs internal registry (of (TopicPartition, LeaderAndIsr) pairs) or to failed elections (of TopicPartition, Either[Exception, LeaderAndIsr]s).
doElectLeaderForPartitions branches off per the input PartitionLeaderElectionStrategy that gives partitions with and without leaders elected.
-
For OfflinePartitionLeaderElectionStrategy,
doElectLeaderForPartitionscollectUncleanLeaderElectionState with the valid partitions for election followed by leaderForOffline. -
For ReassignPartitionLeaderElectionStrategy,
doElectLeaderForPartitionsleaderForReassign -
For PreferredReplicaPartitionLeaderElectionStrategy,
doElectLeaderForPartitionsleaderForPreferredReplica -
For ControlledShutdownPartitionLeaderElectionStrategy,
doElectLeaderForPartitionsleaderForControlledShutdown
doElectLeaderForPartitions adds the partitions with no leader elected to failed elections.
doElectLeaderForPartitions requests the KafkaZkClient to updateLeaderAndIsr (with the adjusted leader and ISRs).
For every successfully-updated partition (in Zookeeper), doElectLeaderForPartitions requests the following:
-
The ControllerContext to partitionFullReplicaAssignment and record the partition leadership
-
The ControllerBrokerRequestBatch to addLeaderAndIsrRequestForBrokers to every live replica broker (with
isNewflag off)
doElectLeaderForPartitions prints out the following DEBUG message to the logs for every partition with no leader elected:
Controller failed to elect leader for partition [partition].
Attempted to write state [partition], but failed with bad ZK version.
This will be retried.
Logging¶
Enable ALL logging level for kafka.controller.ZkPartitionStateMachine logger to see what happens inside.
Add the following line to config/log4j.properties:
log4j.logger.kafka.controller.ZkPartitionStateMachine=ALL
Refer to Logging.
logIdent¶
ZkPartitionStateMachine uses the following logging prefix (with the broker.id):
[PartitionStateMachine controllerId=[brokerId]]
Review Me¶
initializeLeaderAndIsrForPartitions¶
initializeLeaderAndIsrForPartitions(
partitions: Seq[TopicPartition]): Seq[TopicPartition]
initializeLeaderAndIsrForPartitions starts by requesting the <partitions).
From the partition replica assignments, initializeLeaderAndIsrForPartitions makes sure that the replicas are all <
initializeLeaderAndIsrForPartitions splits the partitions (with online replicas only) into two sets with and without replicas (partitionsWithLiveReplicas and partitionsWithoutLiveReplicas, respectively).
For every partition without live (online) replicas, initializeLeaderAndIsrForPartitions <
Controller [controllerId] epoch [epoch] failed to change state for partition [partition] from NewPartition to OnlinePartition
Controller [controllerId] epoch [epoch] encountered error during state change of partition [partition] from New to Online, assigned replicas are [[replicas]], live brokers are [[liveBrokerIds]]. No assigned replica is alive.
initializeLeaderAndIsrForPartitions converts the partitions with live (online) replicas into leaderIsrAndControllerEpochs (LeaderIsrAndControllerEpoch with LeaderAndIsr) and for every pair initializeLeaderAndIsrForPartitions requests the <
For every successful response (from <initializeLeaderAndIsrForPartitions requests the following:
. The <leaderIsrAndControllerEpoch for the partition (in the <
. The <isNew flag on)
In the end, initializeLeaderAndIsrForPartitions returns the partitions that were successfully initialized.
In case of ControllerMovedException (while...FIXME), initializeLeaderAndIsrForPartitions...FIXME
In case of any other error (Exception) (while...FIXME), initializeLeaderAndIsrForPartitions...FIXME
NOTE: initializeLeaderAndIsrForPartitions is used exclusively when ZkPartitionStateMachine is requested to <