Skip to content

ZkPartitionStateMachine

ZkPartitionStateMachine is a PartitionStateMachine of a KafkaController.

ZkPartitionStateMachine and KafkaController

When requested to handle partition state changes, ZkPartitionStateMachine uses the ControllerBrokerRequestBatch to propagate them to all brokers in a cluster.

Creating Instance

ZkPartitionStateMachine takes the following to be created:

ZkPartitionStateMachine is created along with a KafkaController.

Handling State Changes of Partitions

handleStateChanges(
  partitions: Seq[TopicPartition],
  targetState: PartitionState,
  partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]

handleStateChanges is part of the PartitionStateMachine abstraction.


handleStateChanges does nothing and returns an empty collection when executed with no partitions.

handleStateChanges requests the ControllerBrokerRequestBatch to prepare a new batch.

handleStateChanges doHandleStateChanges (that may give some errors that are returned in the end).

In the end, handleStateChanges requests the ControllerBrokerRequestBatch to send controller requests to brokers.

doHandleStateChanges

doHandleStateChanges(
  partitions: Seq[TopicPartition],
  targetState: PartitionState,
  partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]

doHandleStateChanges requests the ControllerContext to putPartitionStateIfNotExists to NonExistentPartition for every partition (in partitions).

doHandleStateChanges requests the ControllerContext to checkValidPartitionStateChange with the given target PartitionState (that splits the partitions into valid and invalid partitions).

doHandleStateChanges logInvalidTransition for every invalid partition.

doHandleStateChanges branches off per the target state:

NewPartition

For NewPartition target state, doHandleStateChanges goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the <> to <> to NewPartition state.

Changed partition [partition] state from [state] to NewPartition with assigned replicas [partitionReplicaAssignment]

OnlinePartition

doHandleStateChanges finds uninitialized partitions (among the valid partitions with NewPartition state).

doHandleStateChanges finds partitions to elect a leader (among the valid partitions with OfflinePartition or OnlinePartition state).

For uninitialized partitions, doHandleStateChanges initializeLeaderAndIsrForPartitions, prints out the following INFO message to the logs and requests the ControllerContext to putPartitionState to OnlinePartition state.

Changed partition [partition] from [state] to OnlinePartition with state [leaderAndIsr]

For partitions to elect a leader, doHandleStateChanges electLeaderForPartitions with the input PartitionLeaderElectionStrategy.

For every partition with leader election successful, doHandleStateChanges prints out the following INFO message to the logs and requests the ControllerContext to putPartitionState to OnlinePartition state.

Changed partition [partition] from [state] to OnlinePartition with state [leaderAndIsr]

In the end, doHandleStateChanges returns the partitions with election failed.

OfflinePartition or NonExistentPartition

For OfflinePartition target state, doHandleStateChanges goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the <> to <> to OfflinePartition state.

Changed partition [partition] state from [state] to OfflinePartition

For NonExistentPartition target state, doHandleStateChanges goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the <> to <> to NonExistentPartition state.

Changed partition [partition] state from [state] to NonExistentPartition

electLeaderForPartitions

electLeaderForPartitions(
  partitions: Seq[TopicPartition],
  partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]

electLeaderForPartitions doElectLeaderForPartitions until all the given partitions have partition leaders elected successfully or not.

For any partition to retry a leader election, electLeaderForPartitions prints out the following INFO message to the logs:

Retrying leader election with strategy [partitionLeaderElectionStrategy] for partitions [remaining]

doElectLeaderForPartitions

doElectLeaderForPartitions(
  partitions: Seq[TopicPartition],
  partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition])

doElectLeaderForPartitions requests the KafkaZkClient for the partition states (with LeaderAndIsr information).

For every partition state response, doElectLeaderForPartitions decodes the response (if possible) and adds it to validLeaderAndIsrs internal registry (of (TopicPartition, LeaderAndIsr) pairs) or to failed elections (of TopicPartition, Either[Exception, LeaderAndIsr]s).

doElectLeaderForPartitions branches off per the input PartitionLeaderElectionStrategy that gives partitions with and without leaders elected.

doElectLeaderForPartitions adds the partitions with no leader elected to failed elections.

doElectLeaderForPartitions requests the KafkaZkClient to updateLeaderAndIsr (with the adjusted leader and ISRs).

For every successfully-updated partition (in Zookeeper), doElectLeaderForPartitions requests the following:

  1. The ControllerContext to partitionFullReplicaAssignment and record the partition leadership

  2. The ControllerBrokerRequestBatch to addLeaderAndIsrRequestForBrokers to every live replica broker (with isNew flag off)

doElectLeaderForPartitions prints out the following DEBUG message to the logs for every partition with no leader elected:

Controller failed to elect leader for partition [partition].
Attempted to write state [partition], but failed with bad ZK version.
This will be retried.

Logging

Enable ALL logging level for kafka.controller.ZkPartitionStateMachine logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.controller.ZkPartitionStateMachine=ALL

Refer to Logging.

logIdent

ZkPartitionStateMachine uses the following logging prefix (with the broker.id):

[PartitionStateMachine controllerId=[brokerId]]

Review Me

initializeLeaderAndIsrForPartitions

initializeLeaderAndIsrForPartitions(
  partitions: Seq[TopicPartition]): Seq[TopicPartition]

initializeLeaderAndIsrForPartitions starts by requesting the <> for the <> for every partition (in the given partitions).

From the partition replica assignments, initializeLeaderAndIsrForPartitions makes sure that the replicas are all <> only (per the <>) so all other partitions are filtered out (excluded).

initializeLeaderAndIsrForPartitions splits the partitions (with online replicas only) into two sets with and without replicas (partitionsWithLiveReplicas and partitionsWithoutLiveReplicas, respectively).

For every partition without live (online) replicas, initializeLeaderAndIsrForPartitions <>:

Controller [controllerId] epoch [epoch] failed to change state for partition [partition] from NewPartition to OnlinePartition
Controller [controllerId] epoch [epoch] encountered error during state change of partition [partition] from New to Online, assigned replicas are [[replicas]], live brokers are [[liveBrokerIds]]. No assigned replica is alive.

initializeLeaderAndIsrForPartitions converts the partitions with live (online) replicas into leaderIsrAndControllerEpochs (LeaderIsrAndControllerEpoch with LeaderAndIsr) and for every pair initializeLeaderAndIsrForPartitions requests the <> to <>.

For every successful response (from <>), initializeLeaderAndIsrForPartitions requests the following:

. The <> to record the leaderIsrAndControllerEpoch for the partition (in the <> registry)

. The <> to <> (with isNew flag on)

In the end, initializeLeaderAndIsrForPartitions returns the partitions that were successfully initialized.

In case of ControllerMovedException (while...FIXME), initializeLeaderAndIsrForPartitions...FIXME

In case of any other error (Exception) (while...FIXME), initializeLeaderAndIsrForPartitions...FIXME

NOTE: initializeLeaderAndIsrForPartitions is used exclusively when ZkPartitionStateMachine is requested to <> (for <>).