ZkReplicaStateMachine¶
ZkReplicaStateMachine
is a ReplicaStateMachine to handle changes of the state of partition replicas.
ZkReplicaStateMachine
uses ControllerBrokerRequestBatch to propagate replica state changes to all brokers in a Kafka cluster.
Creating Instance¶
ZkReplicaStateMachine
takes the following to be created:
- KafkaConfig
-
StateChangeLogger
- ControllerContext
- KafkaZkClient
- ControllerBrokerRequestBatch
ZkReplicaStateMachine
is created along with a KafkaController.
Handling Replica State Changes¶
handleStateChanges(
replicas: Seq[PartitionAndReplica],
targetState: ReplicaState): Unit
handleStateChanges
is part of the ReplicaStateMachine abstraction.
Note
handleStateChanges
is a noop and does nothing when the input replicas
collection is empty.
handleStateChanges
requests the ControllerBrokerRequestBatch for a new batch.
handleStateChanges
groups the replicas
by the replica ID and doHandleStateChanges for every replica ID (with the ReplicaState
).
In the end, handleStateChanges
requests the ControllerBrokerRequestBatch to sendRequestsToBrokers.
doHandleStateChanges¶
doHandleStateChanges(
replicaId: Int,
replicas: Seq[PartitionAndReplica],
targetState: ReplicaState): Unit
For every replica (in the replicas
), doHandleStateChanges
requests the ControllerBrokerRequestBatch to putReplicaStateIfNotExists (with NonExistentReplica
state)
doHandleStateChanges
requests the ControllerBrokerRequestBatch to checkValidReplicaStateChange (that gives valid and invalid replicas).
For every invalid replica, doHandleStateChanges
logInvalidTransition.
doHandleStateChanges
branches off per the input target state (ReplicaState
):
NewReplica
OnlineReplica
OfflineReplica
ReplicaDeletionStarted
ReplicaDeletionIneligible
ReplicaDeletionSuccessful
NonExistentReplica
Logging¶
Enable ALL
logging level for kafka.controller.ZkReplicaStateMachine
logger to see what happens inside.
Add the following line to config/log4j.properties
:
log4j.logger.kafka.controller.ZkReplicaStateMachine=ALL
Refer to Logging.
logIdent¶
ZkReplicaStateMachine
uses the following logging prefix (with the broker.id):
[ReplicaStateMachine controllerId=[brokerId]]