Skip to content

ZkReplicaStateMachine

ZkReplicaStateMachine is a ReplicaStateMachine to handle changes of the state of partition replicas.

ZkReplicaStateMachine uses ControllerBrokerRequestBatch to propagate replica state changes to all brokers in a Kafka cluster.

Creating Instance

ZkReplicaStateMachine takes the following to be created:

ZkReplicaStateMachine is created along with a KafkaController.

Handling Replica State Changes

handleStateChanges(
  replicas: Seq[PartitionAndReplica],
  targetState: ReplicaState): Unit

handleStateChanges is part of the ReplicaStateMachine abstraction.


Note

handleStateChanges is a noop and does nothing when the input replicas collection is empty.

handleStateChanges requests the ControllerBrokerRequestBatch for a new batch.

handleStateChanges groups the replicas by the replica ID and doHandleStateChanges for every replica ID (with the ReplicaState).

In the end, handleStateChanges requests the ControllerBrokerRequestBatch to sendRequestsToBrokers.

doHandleStateChanges

doHandleStateChanges(
  replicaId: Int,
  replicas: Seq[PartitionAndReplica],
  targetState: ReplicaState): Unit

For every replica (in the replicas), doHandleStateChanges requests the ControllerBrokerRequestBatch to putReplicaStateIfNotExists (with NonExistentReplica state)

doHandleStateChanges requests the ControllerBrokerRequestBatch to checkValidReplicaStateChange (that gives valid and invalid replicas).

For every invalid replica, doHandleStateChanges logInvalidTransition.

doHandleStateChanges branches off per the input target state (ReplicaState):

  • NewReplica
  • OnlineReplica
  • OfflineReplica
  • ReplicaDeletionStarted
  • ReplicaDeletionIneligible
  • ReplicaDeletionSuccessful
  • NonExistentReplica

Logging

Enable ALL logging level for kafka.controller.ZkReplicaStateMachine logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.controller.ZkReplicaStateMachine=ALL

Refer to Logging.

logIdent

ZkReplicaStateMachine uses the following logging prefix (with the broker.id):

[ReplicaStateMachine controllerId=[brokerId]]