KafkaController¶
KafkaController is created and immediately started alongside KafkaServer.

KafkaController uses listeners as a notification system to react to changes in Zookeeper.
KafkaController is a state machine (using controller events).
Creating Instance¶
KafkaController takes the following to be created:
- KafkaConfig
- KafkaZkClient
-
Time - Metrics
-
BrokerInfo -
initialBrokerEpoch -
DelegationTokenManager -
BrokerFeatures -
FinalizedFeatureCache - Thread name prefix (default: undefined)
KafkaController is created when:
KafkaServeris requested to start up
ControllerContext¶
KafkaController creates a ControllerContext when created.
ControllerChannelManager¶
KafkaController creates a ControllerChannelManager when created.
ControllerChannelManager is used to create separate ControllerBrokerRequestBatches of the KafkaController itself, the ZkReplicaStateMachine and ZkPartitionStateMachine.
ControllerChannelManager is requested to start up when KafkaController is requested to start controller election (and a broker is successfully elected as the active controller).
KafkaController uses the ControllerChannelManager to add or remove brokers when processing broker changes in Zookeeper (a new or updated znode under /brokers/ids path).
ControllerChannelManager is requested to shut down when KafkaController is requested to resign as the active controller.
Performance Metrics¶
KafkaController is a KafkaMetricsGroup.
ActiveControllerCount¶
1 if isActive. 0 otherwise.
OfflinePartitionsCount¶
PreferredReplicaImbalanceCount¶
ControllerState¶
GlobalTopicCount¶
GlobalPartitionCount¶
TopicsToDeleteCount¶
ReplicasToDeleteCount¶
TopicsIneligibleToDeleteCount¶
ReplicasIneligibleToDeleteCount¶
ActiveBrokerCount¶
FencedBrokerCount¶
ControllerEventProcessor¶
KafkaController is a ControllerEventProcessor to process and preempt controller events.
Resigning As Active Controller¶
onControllerResignation(): Unit
onControllerResignation starts by printing out the following DEBUG message to the logs:
Resigning
onControllerResignation unsubscribes from intercepting Zookeeper events for the following znodes in order:
- Child changes to /isr_change_notification znode
- Data changes to /admin/reassign_partitions znode
- Data changes to /admin/preferred_replica_election znode
- Child changes to /log_dir_event_notification znode
onControllerResignation unregisterBrokerModificationsHandler.
onControllerResignation requests KafkaScheduler to shutdown.
onControllerResignation resets internal counters.
onControllerResignation unregisterPartitionReassignmentIsrChangeHandlers.
onControllerResignation requests the PartitionStateMachine to shutdown.
onControllerResignation unsubscribes from intercepting Zookeeper events for the following znode:
onControllerResignation unregisterPartitionModificationsHandlers.
onControllerResignation unsubscribes from intercepting Zookeeper events for the following znode:
onControllerResignation requests the ReplicaStateMachine to shutdown.
onControllerResignation unsubscribes from intercepting Zookeeper events for the following znode:
onControllerResignation requests the ControllerChannelManager to shutdown.
onControllerResignation requests the ControllerContext to resetContext.
In the end, onControllerResignation prints out the following DEBUG message to the logs:
Resigned
onControllerResignation is used when:
KafkaControlleris requested to shutdown, triggerControllerMove, maybeResign, processExpire
Processing Expire Event¶
processExpire(): Unit
processExpire sets the activeControllerId to -1 followed by onControllerResignation.
processExpire is used when:
KafkaControlleris requested to process Expire event
onBrokerStartup¶
onBrokerStartup(
newBrokers: Seq[Int]): Unit
onBrokerStartup prints out the following INFO message to the logs:
New broker startup callback for [comma-separated list of newBrokers]
onBrokerStartup removes the newBrokers from the replicasOnOfflineDirs of the ControllerContext.
onBrokerStartup sends update metadata request to all the existing brokers in the cluster (of the ControllerContext).
onBrokerStartup sends update metadata request to the new brokers with the full set of partition states (for initialization).
onBrokerStartup requests the ControllerContext for the replicas on the given newBrokers (the entire list of partitions that it is supposed to host).
onBrokerStartup requests the ReplicaStateMachine to handleStateChanges with the replicas on the new brokers and OnlineReplica target state.
onBrokerStartup requests the PartitionStateMachine to triggerOnlinePartitionStateChange.
onBrokerStartup checks if topic deletion can be resumed. onBrokerStartup collects replicas (on the new brokers) that are scheduled to be deleted by requesting the TopicDeletionManager to isTopicQueuedUpForDeletion. If there are any, onBrokerStartup prints out the following INFO message to the logs and requests the TopicDeletionManager to resumeDeletionForTopics.
Some replicas [replicasForTopicsToBeDeleted] for topics scheduled for deletion [topicsToBeDeleted]
are on the newly restarted brokers [newBrokers].
Signaling restart of topic deletion for these topics
In the end, onBrokerStartup registerBrokerModificationsHandler with the new brokers.
onBrokerStartup is used when:
KafkaControlleris requested to process a BrokerChange controller event
ReplicaStateMachine¶
KafkaController creates a ZkReplicaStateMachine when created.
ZkReplicaStateMachine is requested to <
ZkReplicaStateMachine is requested to <
-
<
> to transition replicas to OnlineReplicastate -
<
> to transition replicas to OnlineReplicastate -
<
> to transition replicas to OfflineReplicastate -
<
> to transition replicas to NewReplicastate first and then toOnlineReplica -
<
> to transition replicas to OnlineReplicastate -
<
> to transition replicas to OfflineReplicastate first and then toReplicaDeletionStarted,ReplicaDeletionSuccessful, andNonExistentReplicain the end -
<
> to transition replicas to NewReplicastate -
<
> to transition replicas to OfflineReplicastate
KafkaController uses the ZkReplicaStateMachine to create the <
Shutting Down¶
shutdown(): Unit
shutdown requests the ControllerEventManager to close followed by onControllerResignation.
shutdown is used when:
KafkaServeris requested to shut down.
ControllerEventManager¶
KafkaController creates a ControllerEventManager when created (with broker.id configuration property).
The ControllerEventManager is used to create the following services:
- ControllerBrokerRequestBatch
- ZkReplicaStateMachine
- ZkPartitionStateMachine
- ControllerChangeHandler
- BrokerChangeHandler
- TopicChangeHandler
- TopicDeletionHandler
- PartitionReassignmentHandler
- PreferredReplicaElectionHandler
- IsrChangeNotificationHandler
- LogDirEventNotificationHandler
BrokerModificationsHandler(in registerBrokerModificationsHandler)PartitionReassignmentIsrChangeHandler(in updateCurrentReassignment)PartitionModificationsHandler(in registerPartitionModificationsHandlers)
Processing Controller Events¶
process(
event: ControllerEvent): Unit
process is part of the ControllerEventProcessor abstraction.
process handles the input ControllerEvent and updates the metrics.
updateMetrics¶
updateMetrics(): Unit
updateMetrics updates the metrics (using the ControllerContext).
ControllerMovedException¶
In case of a ControllerMovedException, process prints out the following INFO message to the logs and maybeResign.
Controller moved to another broker when processing [event].
Throwable¶
In case of any other error (Throwable), process simply prints out the following ERROR message to the logs:
Error processing event [event]
PartitionStateMachine¶
KafkaController creates a ZkPartitionStateMachine when created with the following:
KafkaController uses this ZkPartitionStateMachine to create the TopicDeletionManager.
ZkPartitionStateMachine is requested to start up at onControllerFailover (when a broker is successfully elected as the controller) and shut down at controller resignation.
ZkPartitionStateMachine is requested to triggerOnlinePartitionStateChange at the following events:
- onBrokerStartup
- onReplicasBecomeOffline
- processUncleanLeaderElectionEnable
- processTopicUncleanLeaderElectionEnable
ZkPartitionStateMachine is requested to handleStateChanges at the following events:
- onReplicasBecomeOffline
- onNewPartitionCreation
- onReplicaElection
- moveReassignedPartitionLeaderIfRequired
- doControlledShutdown
enableTopicUncleanLeaderElection¶
enableTopicUncleanLeaderElection(
topic: String): Unit
enableTopicUncleanLeaderElection does nothing on an inactive controller.
enableTopicUncleanLeaderElection requests the ControllerEventManager to enqueue a TopicUncleanLeaderElectionEnable event.
enableTopicUncleanLeaderElection is used when:
TopicConfigHandleris requested to processConfigChanges
processTopicUncleanLeaderElectionEnable¶
processTopicUncleanLeaderElectionEnable(
topic: String): Unit
processTopicUncleanLeaderElectionEnable is only executed on an active controller broker and does nothing otherwise.
processTopicUncleanLeaderElectionEnable prints out the following INFO message to the logs:
Unclean leader election has been enabled for topic [topic]
In the end, processTopicUncleanLeaderElectionEnable requests the ZkPartitionStateMachine to triggerOnlinePartitionStateChange.
processTopicUncleanLeaderElectionEnable is used when:
KafkaControlleris requested to process a TopicUncleanLeaderElectionEnable event
Logging¶
Enable ALL logging level for kafka.controller.KafkaController logger to see what happens inside.
Add the following line to config/log4j.properties:
log4j.logger.kafka.controller.KafkaController=ALL
Refer to Logging
Note
Please note that Kafka comes with a preconfigured kafka.controller logger in config/log4j.properties:
log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false
That means that the logs of KafkaController go to logs/controller.log file at TRACE logging level and are not added to the main logs (per log4j.additivity being off)
logIdent¶
KafkaController uses the following logging prefix (with the broker.id):
[Controller id=[brokerId]]
Review Me¶
[[state]] KafkaController is in one of the <
KafkaController uses the <
Unsubscribing from Child Changes to /isr_change_notification ZNode¶
unregisterZNodeChildChangeHandler(): Unit
unregisterZNodeChildChangeHandler prints out the following DEBUG message to the logs:
De-registering IsrChangeNotificationListener
unregisterZNodeChildChangeHandler requests </isr_change_notification znode with <
unregisterZNodeChildChangeHandler is used when:
KafkaControlleris requested to resign as the active controller
Unsubscribing from Child Changes to /log_dir_event_notification ZNode¶
deregisterLogDirEventNotificationListener(): Unit
deregisterLogDirEventNotificationListener prints out the following DEBUG message to the logs:
De-registering logDirEventNotificationListener
deregisterLogDirEventNotificationListener requests </log_dir_event_notification znode with <
deregisterLogDirEventNotificationListener is used when:
KafkaControlleris requested to resign as the active controller
Unsubscribing from Data Changes to /admin/preferred_replica_election ZNode¶
deregisterPreferredReplicaElectionListener(): Unit
deregisterPreferredReplicaElectionListener requests </admin/preferred_replica_election znode with <
deregisterPreferredReplicaElectionListener is used when:
KafkaControlleris requested to resign as the active controller
Unsubscribing from Data Changes to /admin/reassign_partitions ZNode¶
deregisterPartitionReassignmentListener(): Unit
deregisterPartitionReassignmentListener requests </admin/reassign_partitions znode with <
deregisterPartitionReassignmentListener is used when:
KafkaControlleris requested to resign as the active controller
sendUpdateMetadataRequest¶
sendUpdateMetadataRequest(): Unit
sendUpdateMetadataRequest requests the <
In the end, sendUpdateMetadataRequest requests the <
In case of IllegalStateException, sendUpdateMetadataRequest <
sendUpdateMetadataRequest is used when:
-
KafkaControlleris requested to <>, < >, < >, < >, < >, process a < > controller event -
TopicDeletionManageris requested to <>
updateLeaderEpochAndSendRequest¶
updateLeaderEpochAndSendRequest(
partition: TopicPartition,
replicasToReceiveRequest: Seq[Int],
newAssignedReplicas: Seq[Int]): Unit
[[updateLeaderEpochAndSendRequest-updateLeaderEpoch]] updateLeaderEpochAndSendRequest <
NOTE: updateLeaderEpochAndSendRequest is used when KafkaController is requested to <
LeaderIsrAndControllerEpoch¶
When <LeaderIsrAndControllerEpoch, updateLeaderEpochAndSendRequest requests the <updateLeaderEpochAndSendRequest requests the <
In the end, updateLeaderEpochAndSendRequest prints out the following TRACE message to the logs:
Sent LeaderAndIsr request [updatedLeaderIsrAndControllerEpoch] with new assigned replica list [newAssignedReplicas] to leader [leader] for partition being reassigned [partition]
No LeaderIsrAndControllerEpoch¶
When <None, updateLeaderEpochAndSendRequest prints out the following ERROR message to the logs:
Failed to send LeaderAndIsr request with new assigned replica list [newAssignedReplicas] to leader for partition being reassigned [partition]
=== [[elect]] Controller Election
elect(): Unit
elect requests the <-1 if not available) and saves it to the <
elect stops the controller election if there is an active controller ID available and prints out the following DEBUG message to the logs:
Broker [activeControllerId] has been elected as the controller, so stopping the election process.
[[elect-registerControllerAndIncrementControllerEpoch]] Otherwise, with no active controller, elect requests the <
elect saves the controller epoch and the zookeeper epoch as the <
elect saves the <
elect prints out the following INFO message to the logs:
[brokerId] successfully elected as the controller. Epoch incremented to [epoch] and epoch zk version is now [epochZkVersion]
In the end, elect <
NOTE: elect is used when ControllerEventThread is requested to process <
==== [[elect-ControllerMovedException]] elect and ControllerMovedException
In case of a ControllerMovedException, elect <
Broker [activeControllerId] was elected as controller instead of broker [brokerId]
A controller has been elected but just resigned, this will result in another round of election
=== [[isActive]] Is Broker The Active Controller?
isActive: Boolean
isActive indicates whether the current broker (by the broker ID) hosts the active KafkaController (given the <
NOTE: isActive is on (true) after the KafkaController of a Kafka broker has been <
[NOTE]¶
isActive is used (as a valve to stop processing early) when:
-
ControllerEventThreadis requested to <> (that should only be processed on the active controller, e.g. AutoPreferredReplicaLeaderElection,UncleanLeaderElectionEnable,ControlledShutdown,LeaderAndIsrResponseReceived,TopicDeletionStopReplicaResponseReceived,BrokerChange,BrokerModifications,TopicChange) -
KafkaControlleris requested to <>
* KafkaApis is requested to <>, <> and <>¶
=== [[startup]] Starting Up
[source, scala]¶
startup(): Unit¶
startup requests the <
-
On
afterInitializingSession, theStateChangeHandlersimply putsRegisterBrokerAndReelectevent on the event queue of the <> -
On
beforeInitializingSession, theStateChangeHandlersimply putsExpireevent on the event queue of the <>
startup then puts Startup event at the end of the event queue of the <
NOTE: startup is used exclusively when KafkaServer is requested to <
=== [[registerSessionExpirationListener]] Registering SessionExpirationListener To Control Session Recreation -- registerSessionExpirationListener Internal Method
[source, scala]¶
registerSessionExpirationListener(): Unit¶
registerSessionExpirationListener requests <SessionExpirationListener (with the KafkaController and <
NOTE: SessionExpirationListener puts <ControllerEventManager every time the Zookeeper session has expired and a new session has been created.
NOTE: registerSessionExpirationListener is used exclusively when <ControllerEventThread is link:kafka-controller-ControllerEventThread.md#doWork[started]).
=== [[registerControllerChangeListener]] Registering ControllerChangeListener for /controller ZNode Changes -- registerControllerChangeListener Internal Method
[source, scala]¶
registerControllerChangeListener(): Unit¶
registerControllerChangeListener requests </controller znode with a ControllerChangeListener (with the KafkaController and <
[NOTE]¶
ControllerChangeListener emits:
- <
> event with the current controller ID (on the link:kafka-controller-ControllerEventManager.md#queue[event queue] of ControllerEventManager) every time the data of a znode changes
1. <> event when the data associated with a znode has been deleted¶
NOTE: registerControllerChangeListener is used exclusively when <ControllerEventThread is link:kafka-controller-ControllerEventThread.md#doWork[started]).
=== [[registerBrokerChangeListener]] registerBrokerChangeListener Internal Method
[source, scala]¶
registerBrokerChangeListener(): Option[Seq[String]]¶
registerBrokerChangeListener requests </brokers/ids path with <
NOTE: registerBrokerChangeListener is used exclusively when KafkaController does <
=== [[getControllerID]] Getting Active Controller ID (from JSON under /controller znode) -- getControllerID Method
[source, scala]¶
getControllerID(): Int¶
getControllerID returns the ID of the active Kafka controller that is associated with /controller znode in JSON format or -1 otherwise.
Internally, getControllerID requests </controller znode].
If available, getControllerID parses the data (being the current controller info in JSON format) to extract brokerid field.
$ ./bin/zookeeper-shell.sh :2181 get /controller
{"version":1,"brokerid":0,"timestamp":"1543499076007"}
cZxid = 0x60
ctime = Thu Nov 29 14:44:36 CET 2018
mZxid = 0x60
mtime = Thu Nov 29 14:44:36 CET 2018
pZxid = 0x60
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100073f07ba0003
dataLength = 54
numChildren = 0
Otherwise, when no /controller znode is available, getControllerID returns -1.
[NOTE]¶
getControllerID is used when:
- Processing
Reelectcontroller event
1. <>¶
=== [[registerTopicDeletionListener]] Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode -- registerTopicDeletionListener Internal Method
[source, scala]¶
registerTopicDeletionListener(): Option[Seq[String]]¶
registerTopicDeletionListener requests </admin/delete_topics znode with <
NOTE: registerTopicDeletionListener is used exclusively when KafkaController does <
=== [[deregisterTopicDeletionListener]] De-Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode -- deregisterTopicDeletionListener Internal Method
[source, scala]¶
deregisterTopicDeletionListener(): Unit¶
deregisterTopicDeletionListener requests </admin/delete_topics znode with <
NOTE: deregisterTopicDeletionListener is used exclusively when KafkaController <
=== [[initializeControllerContext]] Initializing ControllerContext -- initializeControllerContext Internal Method
[source, scala]¶
initializeControllerContext(): Unit¶
initializeControllerContext...FIXME
In the end, initializeControllerContext prints out the following INFO messages to the logs (with the current state based on the <
[options="wrap"]¶
Currently active brokers in the cluster: [liveBrokerIds] Currently shutting brokers in the cluster: [shuttingDownBrokerIds] Current list of topics in the cluster: [allTopics]
NOTE: initializeControllerContext is used exclusively when KafkaController is requested to <
=== [[updateLeaderAndIsrCache]] updateLeaderAndIsrCache Internal Method
[source, scala]¶
updateLeaderAndIsrCache(partitions: Seq[TopicPartition]¶
Unless given, updateLeaderAndIsrCache defaults to <
updateLeaderAndIsrCache requests the <
NOTE: updateLeaderAndIsrCache is used when KafkaController is requested to <
=== [[onPreferredReplicaElection]] Preferred Replica Leader Election -- onPreferredReplicaElection Internal Method
[source, scala]¶
onPreferredReplicaElection( partitions: Set[TopicPartition], electionType: ElectionType): Map[TopicPartition, Throwable]
onPreferredReplicaElection prints out the following INFO message to the logs:
Starting preferred replica leader election for partitions [partitions]
onPreferredReplicaElection requests the <OnlinePartition target state and <
(only for <onPreferredReplicaElection <
(only for <
[NOTE]¶
onPreferredReplicaElection is used when KafkaController is requested for the following:
-
<
> (with < > election type) -
<
> (with < > election type)
* <> (any election type with <> the default)¶
=== [[onControllerFailover]] onControllerFailover Internal Method
[source, scala]¶
onControllerFailover(): Unit¶
onControllerFailover prints out the following INFO message to the logs:
Registering handlers
onControllerFailover requests the <
- <
> - <
> - <
> - <
> - <
>
onControllerFailover requests the <
- <
> - <
>
onControllerFailover prints out the following INFO message to the logs:
Deleting log dir event notifications
onControllerFailover requests the <
onControllerFailover prints out the following INFO message to the logs:
Deleting isr change notifications
onControllerFailover requests the <
onControllerFailover prints out the following INFO message to the logs:
Initializing controller context
onControllerFailover <
onControllerFailover prints out the following INFO message to the logs:
Fetching topic deletions in progress
onControllerFailover <
onControllerFailover prints out the following INFO message to the logs:
Initializing topic deletion manager
onControllerFailover requests the <
onControllerFailover prints out the following INFO message to the logs:
Sending update metadata request
onControllerFailover <
onControllerFailover requests the <
onControllerFailover requests the <
onControllerFailover prints out the following INFO message to the logs:
Ready to serve as the new controller with epoch [epoch]
onControllerFailover <
onControllerFailover requests the <
onControllerFailover <
onControllerFailover prints out the following INFO message to the logs:
Starting the controller scheduler
onControllerFailover requests the <
With <true), onControllerFailover <
With <(empty)), onControllerFailover prints out the following INFO message to the logs:
starting the token expiry check scheduler
onControllerFailover requests the <
NOTE: onControllerFailover is used when KafkaController is requested to <
=== [[scheduleAutoLeaderRebalanceTask]] scheduleAutoLeaderRebalanceTask Internal Method
[source, scala]¶
scheduleAutoLeaderRebalanceTask( delay: Long, unit: TimeUnit): Unit
scheduleAutoLeaderRebalanceTask simply requests the <
The auto-leader-rebalance-task simply requests the <
NOTE: scheduleAutoLeaderRebalanceTask is used when KafkaController is requested to <
=== [[processAutoPreferredReplicaLeaderElection]] processAutoPreferredReplicaLeaderElection Internal Method
[source, scala]¶
processAutoPreferredReplicaLeaderElection(): Unit¶
NOTE: processAutoPreferredReplicaLeaderElection does nothing (and simply returns) unless the Kafka broker (KafkaController) is an <
processAutoPreferredReplicaLeaderElection prints out the following INFO message to the logs:
Processing automatic preferred replica leader election
processAutoPreferredReplicaLeaderElection <
In the end, processAutoPreferredReplicaLeaderElection <300 seconds).
NOTE: processAutoPreferredReplicaLeaderElection is used exclusively when KafkaController is requested to <
=== [[checkAndTriggerAutoLeaderRebalance]] checkAndTriggerAutoLeaderRebalance Internal Method
[source, scala]¶
checkAndTriggerAutoLeaderRebalance(): Unit¶
checkAndTriggerAutoLeaderRebalance prints out the following TRACE message to the logs:
Checking need to trigger auto leader balancing
[[checkAndTriggerAutoLeaderRebalance-preferredReplicasForTopicsByBrokers]] checkAndTriggerAutoLeaderRebalance...FIXME
checkAndTriggerAutoLeaderRebalance prints out the following DEBUG message to the logs:
Preferred replicas by broker [preferredReplicasForTopicsByBrokers]
For every broker with one or more partition leaders, checkAndTriggerAutoLeaderRebalance...FIXME
checkAndTriggerAutoLeaderRebalance prints out the following DEBUG message to the logs:
Topics not in preferred replica for broker [leaderBroker] [topicsNotInPreferredReplica]
[[checkAndTriggerAutoLeaderRebalance-imbalanceRatio]] checkAndTriggerAutoLeaderRebalance calculates an imbalance ratio of the broker which is the number of topicsNotInPreferredReplica divided by the total number of partitions (topicPartitionsForBroker).
checkAndTriggerAutoLeaderRebalance prints out the following TRACE message to the logs:
Leader imbalance ratio for broker [leaderBroker] is [imbalanceRatio]
[[checkAndTriggerAutoLeaderRebalance-candidatePartitions]] With the imbalance ratio greater than the desired ratio (per <10%), checkAndTriggerAutoLeaderRebalance <
NOTE: checkAndTriggerAutoLeaderRebalance is used exclusively when KafkaController is requested to <
=== [[onBrokerLogDirFailure]] Handling Log Directory Failures for Brokers -- onBrokerLogDirFailure Internal Method
[source, scala]¶
onBrokerLogDirFailure( brokerIds: Seq[Int]): Unit
onBrokerLogDirFailure prints out the following INFO message to the logs:
Handling log directory failure for brokers [brokerIds]
onBrokerLogDirFailure requests the <OnlineReplica state.
NOTE: onBrokerLogDirFailure is used exclusively when KafkaController is requested to <
=== [[enableDefaultUncleanLeaderElection]] enableDefaultUncleanLeaderElection Method
[source, scala]¶
enableDefaultUncleanLeaderElection(): Unit¶
NOTE: enableDefaultUncleanLeaderElection does nothing (and simply returns) unless the Kafka broker (KafkaController) is an <
enableDefaultUncleanLeaderElection simply requests the <
NOTE: enableDefaultUncleanLeaderElection is used when DynamicLogConfig is requested to link:kafka-server-DynamicLogConfig.md#reconfigure[reconfigure] (for link:kafka-log-LogConfig.md#unclean.leader.election.enable[unclean.leader.election.enable] configuration property).
=== [[electPreferredLeaders]] Preferred Replica Leader Election -- electPreferredLeaders Method
[source, scala]¶
electPreferredLeaders( partitions: Set[TopicPartition], callback: ElectPreferredLeadersCallback = { (_, _) => }): Unit
electPreferredLeaders simply requests the <
NOTE: electPreferredLeaders is used exclusively when ReplicaManager is requested to <
=== [[processUncleanLeaderElectionEnable]] processUncleanLeaderElectionEnable Internal Method
[source, scala]¶
processUncleanLeaderElectionEnable(): Unit¶
NOTE: processUncleanLeaderElectionEnable does nothing (and simply returns) unless the Kafka broker (KafkaController) is an <
processUncleanLeaderElectionEnable prints out the following INFO message to the logs:
Unclean leader election has been enabled by default
processUncleanLeaderElectionEnable requests the <
NOTE: processUncleanLeaderElectionEnable is used exclusively when KafkaController is requested to <
=== [[processBrokerChange]] Processing BrokerChange Controller Event (On controller-event-thread) -- processBrokerChange Internal Method
[source, scala]¶
processBrokerChange(): Unit¶
NOTE: processBrokerChange does nothing (and simply returns) unless the Kafka broker (KafkaController) is an <
processBrokerChange requests the <
At this point in time, processBrokerChange knows what brokers are new, dead or bounced.
processBrokerChange prints out the following INFO message to the logs:
[options="wrap"]¶
Newly added brokers: [ids], deleted brokers: [ids], bounced brokers: [ids], all live brokers: [ids]¶
processBrokerChange notifies (updates) the <
-
For every newly-added broker,
processBrokerChangerequests to <> -
For bounced brokers,
processBrokerChangerequests to <> first followed by < > -
For every deleted broker,
processBrokerChangerequests to <>
processBrokerChange updates the <
-
For newly-added brokers (if there were any),
processBrokerChangerequests to <> followed by < > -
For bounced brokers (if there were any),
processBrokerChangefirst requests to <> followed by < > and then requests to < > followed by < > -
For deleted brokers (if there were any),
processBrokerChangerequests to <> followed by < >
In the end, only when they were any updates (new, dead or bounced brokers), processBrokerChange prints out the following INFO message to the logs:
Updated broker epochs cache: [liveBrokerIdAndEpochs]
NOTE: processBrokerChange is used exclusively when KafkaController is requested to <
=== [[processLogDirEventNotification]] Processing LogDirEventNotification Controller Event (On controller-event-thread) -- processLogDirEventNotification Internal Method
[source, scala]¶
processLogDirEventNotification(): Unit¶
NOTE: processLogDirEventNotification does nothing (and simply returns) unless the Kafka broker (KafkaController) is an <
processLogDirEventNotification requests the <
processLogDirEventNotification requests the <
In the end, processLogDirEventNotification requests the <
NOTE: processLogDirEventNotification is used exclusively when KafkaController is requested to <
=== [[processStartup]] processStartup Internal Method
[source, scala]¶
processStartup(): Unit¶
processStartup requests the <
In the end, processStartup starts <
NOTE: processStartup is used exclusively when KafkaController is requested to <
=== [[internal-properties]] Internal Properties
| activeControllerId a| [[activeControllerId]] The ID of the active KafkaController
- Initialized to
-1
| brokerRequestBatch a| [[brokerRequestBatch]] <
| controllerChangeHandler a| [[controllerChangeHandler]] A ZNodeChangeHandler (for the KafkaController and the </controller znode.
controllerChangeHandler <
-
ControllerChangewhen the znode is created or the znode data changed -
Reelectwhen the znode is deleted
| eventManager a| [[eventManager]] <
eventManager is used to create other internal components to allow them for emitting controller events at state changes:
- <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
>
eventManager is <KafkaController is requested to <
eventManager is <KafkaController is requested to <
| kafkaScheduler | [[kafkaScheduler]] <
| stateChangeLogger | [[stateChangeLogger]] link:kafka-controller-StateChangeLogger.md[StateChangeLogger] with the <inControllerContext flag enabled
| tokenCleanScheduler | [[tokenCleanScheduler]] <
| topicDeletionManager | [[topicDeletionManager]] <