ZkReplicaStateMachine¶
ZkReplicaStateMachine is a ReplicaStateMachine to handle changes of the state of partition replicas.
ZkReplicaStateMachine uses ControllerBrokerRequestBatch to propagate replica state changes to all brokers in a Kafka cluster.
Creating Instance¶
ZkReplicaStateMachine takes the following to be created:
- KafkaConfig
-
StateChangeLogger - ControllerContext
- KafkaZkClient
- ControllerBrokerRequestBatch
ZkReplicaStateMachine is created along with a KafkaController.
Handling Replica State Changes¶
handleStateChanges(
replicas: Seq[PartitionAndReplica],
targetState: ReplicaState): Unit
handleStateChanges is part of the ReplicaStateMachine abstraction.
Note
handleStateChanges is a noop and does nothing when the input replicas collection is empty.
handleStateChanges requests the ControllerBrokerRequestBatch for a new batch.
handleStateChanges groups the replicas by the replica ID and doHandleStateChanges for every replica ID (with the ReplicaState).
In the end, handleStateChanges requests the ControllerBrokerRequestBatch to sendRequestsToBrokers.
doHandleStateChanges¶
doHandleStateChanges(
replicaId: Int,
replicas: Seq[PartitionAndReplica],
targetState: ReplicaState): Unit
For every replica (in the replicas), doHandleStateChanges requests the ControllerBrokerRequestBatch to putReplicaStateIfNotExists (with NonExistentReplica state)
doHandleStateChanges requests the ControllerBrokerRequestBatch to checkValidReplicaStateChange (that gives valid and invalid replicas).
For every invalid replica, doHandleStateChanges logInvalidTransition.
doHandleStateChanges branches off per the input target state (ReplicaState):
NewReplicaOnlineReplicaOfflineReplicaReplicaDeletionStartedReplicaDeletionIneligibleReplicaDeletionSuccessfulNonExistentReplica
Logging¶
Enable ALL logging level for kafka.controller.ZkReplicaStateMachine logger to see what happens inside.
Add the following line to config/log4j.properties:
log4j.logger.kafka.controller.ZkReplicaStateMachine=ALL
Refer to Logging.
logIdent¶
ZkReplicaStateMachine uses the following logging prefix (with the broker.id):
[ReplicaStateMachine controllerId=[brokerId]]