ZkPartitionStateMachine¶
ZkPartitionStateMachine
is a PartitionStateMachine of a KafkaController.
When requested to handle partition state changes, ZkPartitionStateMachine
uses the ControllerBrokerRequestBatch to propagate them to all brokers in a cluster.
Creating Instance¶
ZkPartitionStateMachine
takes the following to be created:
- KafkaConfig
-
StateChangeLogger
- ControllerContext
- KafkaZkClient
- ControllerBrokerRequestBatch
ZkPartitionStateMachine
is created along with a KafkaController.
Handling State Changes of Partitions¶
handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
handleStateChanges
is part of the PartitionStateMachine abstraction.
handleStateChanges
does nothing and returns an empty collection when executed with no partitions.
handleStateChanges
requests the ControllerBrokerRequestBatch to prepare a new batch.
handleStateChanges
doHandleStateChanges (that may give some errors that are returned in the end).
In the end, handleStateChanges
requests the ControllerBrokerRequestBatch to send controller requests to brokers.
doHandleStateChanges¶
doHandleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
doHandleStateChanges
requests the ControllerContext to putPartitionStateIfNotExists to NonExistentPartition
for every partition (in partitions
).
doHandleStateChanges
requests the ControllerContext to checkValidPartitionStateChange with the given target PartitionState
(that splits the partitions into valid and invalid partitions).
doHandleStateChanges
logInvalidTransition for every invalid partition.
doHandleStateChanges
branches off per the target state:
NewPartition¶
For NewPartition
target state, doHandleStateChanges
goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the <NewPartition
state.
Changed partition [partition] state from [state] to NewPartition with assigned replicas [partitionReplicaAssignment]
OnlinePartition¶
doHandleStateChanges
finds uninitialized partitions (among the valid partitions with NewPartition
state).
doHandleStateChanges
finds partitions to elect a leader (among the valid partitions with OfflinePartition
or OnlinePartition
state).
For uninitialized partitions, doHandleStateChanges
initializeLeaderAndIsrForPartitions, prints out the following INFO message to the logs and requests the ControllerContext to putPartitionState to OnlinePartition
state.
Changed partition [partition] from [state] to OnlinePartition with state [leaderAndIsr]
For partitions to elect a leader, doHandleStateChanges
electLeaderForPartitions with the input PartitionLeaderElectionStrategy.
For every partition with leader election successful, doHandleStateChanges
prints out the following INFO message to the logs and requests the ControllerContext to putPartitionState to OnlinePartition
state.
Changed partition [partition] from [state] to OnlinePartition with state [leaderAndIsr]
In the end, doHandleStateChanges
returns the partitions with election failed.
OfflinePartition or NonExistentPartition¶
For OfflinePartition
target state, doHandleStateChanges
goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the <OfflinePartition
state.
Changed partition [partition] state from [state] to OfflinePartition
For NonExistentPartition
target state, doHandleStateChanges
goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the <NonExistentPartition
state.
Changed partition [partition] state from [state] to NonExistentPartition
electLeaderForPartitions¶
electLeaderForPartitions(
partitions: Seq[TopicPartition],
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
electLeaderForPartitions
doElectLeaderForPartitions until all the given partitions have partition leaders elected successfully or not.
For any partition to retry a leader election, electLeaderForPartitions
prints out the following INFO message to the logs:
Retrying leader election with strategy [partitionLeaderElectionStrategy] for partitions [remaining]
doElectLeaderForPartitions¶
doElectLeaderForPartitions(
partitions: Seq[TopicPartition],
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition])
doElectLeaderForPartitions
requests the KafkaZkClient for the partition states (with LeaderAndIsr
information).
For every partition state response, doElectLeaderForPartitions
decodes the response (if possible) and adds it to validLeaderAndIsrs
internal registry (of (TopicPartition, LeaderAndIsr)
pairs) or to failed elections (of TopicPartition, Either[Exception, LeaderAndIsr]
s).
doElectLeaderForPartitions
branches off per the input PartitionLeaderElectionStrategy that gives partitions with and without leaders elected.
-
For OfflinePartitionLeaderElectionStrategy,
doElectLeaderForPartitions
collectUncleanLeaderElectionState with the valid partitions for election followed by leaderForOffline. -
For ReassignPartitionLeaderElectionStrategy,
doElectLeaderForPartitions
leaderForReassign -
For PreferredReplicaPartitionLeaderElectionStrategy,
doElectLeaderForPartitions
leaderForPreferredReplica -
For ControlledShutdownPartitionLeaderElectionStrategy,
doElectLeaderForPartitions
leaderForControlledShutdown
doElectLeaderForPartitions
adds the partitions with no leader elected to failed elections.
doElectLeaderForPartitions
requests the KafkaZkClient to updateLeaderAndIsr (with the adjusted leader and ISRs).
For every successfully-updated partition (in Zookeeper), doElectLeaderForPartitions
requests the following:
-
The ControllerContext to partitionFullReplicaAssignment and record the partition leadership
-
The ControllerBrokerRequestBatch to addLeaderAndIsrRequestForBrokers to every live replica broker (with
isNew
flag off)
doElectLeaderForPartitions
prints out the following DEBUG message to the logs for every partition with no leader elected:
Controller failed to elect leader for partition [partition].
Attempted to write state [partition], but failed with bad ZK version.
This will be retried.
Logging¶
Enable ALL
logging level for kafka.controller.ZkPartitionStateMachine
logger to see what happens inside.
Add the following line to config/log4j.properties
:
log4j.logger.kafka.controller.ZkPartitionStateMachine=ALL
Refer to Logging.
logIdent¶
ZkPartitionStateMachine
uses the following logging prefix (with the broker.id):
[PartitionStateMachine controllerId=[brokerId]]
Review Me¶
initializeLeaderAndIsrForPartitions¶
initializeLeaderAndIsrForPartitions(
partitions: Seq[TopicPartition]): Seq[TopicPartition]
initializeLeaderAndIsrForPartitions
starts by requesting the <partitions
).
From the partition replica assignments, initializeLeaderAndIsrForPartitions
makes sure that the replicas are all <
initializeLeaderAndIsrForPartitions
splits the partitions (with online replicas only) into two sets with and without replicas (partitionsWithLiveReplicas
and partitionsWithoutLiveReplicas
, respectively).
For every partition without live (online) replicas, initializeLeaderAndIsrForPartitions
<
Controller [controllerId] epoch [epoch] failed to change state for partition [partition] from NewPartition to OnlinePartition
Controller [controllerId] epoch [epoch] encountered error during state change of partition [partition] from New to Online, assigned replicas are [[replicas]], live brokers are [[liveBrokerIds]]. No assigned replica is alive.
initializeLeaderAndIsrForPartitions
converts the partitions with live (online) replicas into leaderIsrAndControllerEpochs
(LeaderIsrAndControllerEpoch
with LeaderAndIsr
) and for every pair initializeLeaderAndIsrForPartitions
requests the <
For every successful response (from <initializeLeaderAndIsrForPartitions
requests the following:
. The <leaderIsrAndControllerEpoch
for the partition (in the <
. The <isNew
flag on)
In the end, initializeLeaderAndIsrForPartitions
returns the partitions that were successfully initialized.
In case of ControllerMovedException
(while...FIXME), initializeLeaderAndIsrForPartitions
...FIXME
In case of any other error (Exception
) (while...FIXME), initializeLeaderAndIsrForPartitions
...FIXME
NOTE: initializeLeaderAndIsrForPartitions
is used exclusively when ZkPartitionStateMachine
is requested to <