KafkaController¶
KafkaController
is created and immediately started alongside KafkaServer.
KafkaController
uses listeners as a notification system to react to changes in Zookeeper.
KafkaController
is a state machine (using controller events).
Creating Instance¶
KafkaController
takes the following to be created:
- KafkaConfig
- KafkaZkClient
-
Time
- Metrics
-
BrokerInfo
-
initialBrokerEpoch
-
DelegationTokenManager
-
BrokerFeatures
-
FinalizedFeatureCache
- Thread name prefix (default: undefined)
KafkaController
is created when:
KafkaServer
is requested to start up
ControllerContext¶
KafkaController
creates a ControllerContext when created.
ControllerChannelManager¶
KafkaController
creates a ControllerChannelManager
when created.
ControllerChannelManager
is used to create separate ControllerBrokerRequestBatch
es of the KafkaController itself, the ZkReplicaStateMachine and ZkPartitionStateMachine.
ControllerChannelManager
is requested to start up when KafkaController
is requested to start controller election (and a broker is successfully elected as the active controller).
KafkaController
uses the ControllerChannelManager
to add or remove brokers when processing broker changes in Zookeeper (a new or updated znode under /brokers/ids
path).
ControllerChannelManager
is requested to shut down when KafkaController
is requested to resign as the active controller.
Performance Metrics¶
KafkaController
is a KafkaMetricsGroup.
ActiveControllerCount¶
1
if isActive. 0
otherwise.
OfflinePartitionsCount¶
PreferredReplicaImbalanceCount¶
ControllerState¶
GlobalTopicCount¶
GlobalPartitionCount¶
TopicsToDeleteCount¶
ReplicasToDeleteCount¶
TopicsIneligibleToDeleteCount¶
ReplicasIneligibleToDeleteCount¶
ActiveBrokerCount¶
FencedBrokerCount¶
ControllerEventProcessor¶
KafkaController
is a ControllerEventProcessor
to process and preempt controller events.
Resigning As Active Controller¶
onControllerResignation(): Unit
onControllerResignation
starts by printing out the following DEBUG message to the logs:
Resigning
onControllerResignation
unsubscribes from intercepting Zookeeper events for the following znodes in order:
- Child changes to /isr_change_notification znode
- Data changes to /admin/reassign_partitions znode
- Data changes to /admin/preferred_replica_election znode
- Child changes to /log_dir_event_notification znode
onControllerResignation
unregisterBrokerModificationsHandler.
onControllerResignation
requests KafkaScheduler to shutdown
.
onControllerResignation
resets internal counters.
onControllerResignation
unregisterPartitionReassignmentIsrChangeHandlers.
onControllerResignation
requests the PartitionStateMachine to shutdown
.
onControllerResignation
unsubscribes from intercepting Zookeeper events for the following znode:
onControllerResignation
unregisterPartitionModificationsHandlers.
onControllerResignation
unsubscribes from intercepting Zookeeper events for the following znode:
onControllerResignation
requests the ReplicaStateMachine to shutdown
.
onControllerResignation
unsubscribes from intercepting Zookeeper events for the following znode:
onControllerResignation
requests the ControllerChannelManager to shutdown
.
onControllerResignation
requests the ControllerContext to resetContext
.
In the end, onControllerResignation
prints out the following DEBUG message to the logs:
Resigned
onControllerResignation
is used when:
KafkaController
is requested to shutdown, triggerControllerMove, maybeResign, processExpire
Processing Expire Event¶
processExpire(): Unit
processExpire
sets the activeControllerId to -1
followed by onControllerResignation.
processExpire
is used when:
KafkaController
is requested to process Expire event
onBrokerStartup¶
onBrokerStartup(
newBrokers: Seq[Int]): Unit
onBrokerStartup
prints out the following INFO message to the logs:
New broker startup callback for [comma-separated list of newBrokers]
onBrokerStartup
removes the newBrokers
from the replicasOnOfflineDirs of the ControllerContext.
onBrokerStartup
sends update metadata request to all the existing brokers in the cluster (of the ControllerContext).
onBrokerStartup
sends update metadata request to the new brokers with the full set of partition states (for initialization).
onBrokerStartup
requests the ControllerContext for the replicas on the given newBrokers
(the entire list of partitions that it is supposed to host).
onBrokerStartup
requests the ReplicaStateMachine to handleStateChanges
with the replicas on the new brokers and OnlineReplica
target state.
onBrokerStartup
requests the PartitionStateMachine to triggerOnlinePartitionStateChange
.
onBrokerStartup
checks if topic deletion can be resumed. onBrokerStartup
collects replicas (on the new brokers) that are scheduled to be deleted by requesting the TopicDeletionManager to isTopicQueuedUpForDeletion
. If there are any, onBrokerStartup
prints out the following INFO message to the logs and requests the TopicDeletionManager to resumeDeletionForTopics
.
Some replicas [replicasForTopicsToBeDeleted] for topics scheduled for deletion [topicsToBeDeleted]
are on the newly restarted brokers [newBrokers].
Signaling restart of topic deletion for these topics
In the end, onBrokerStartup
registerBrokerModificationsHandler with the new brokers.
onBrokerStartup
is used when:
KafkaController
is requested to process a BrokerChange controller event
ReplicaStateMachine¶
KafkaController
creates a ZkReplicaStateMachine when created.
ZkReplicaStateMachine
is requested to <
ZkReplicaStateMachine
is requested to <
-
<
> to transition replicas to OnlineReplica
state -
<
> to transition replicas to OnlineReplica
state -
<
> to transition replicas to OfflineReplica
state -
<
> to transition replicas to NewReplica
state first and then toOnlineReplica
-
<
> to transition replicas to OnlineReplica
state -
<
> to transition replicas to OfflineReplica
state first and then toReplicaDeletionStarted
,ReplicaDeletionSuccessful
, andNonExistentReplica
in the end -
<
> to transition replicas to NewReplica
state -
<
> to transition replicas to OfflineReplica
state
KafkaController
uses the ZkReplicaStateMachine
to create the <
Shutting Down¶
shutdown(): Unit
shutdown
requests the ControllerEventManager to close followed by onControllerResignation.
shutdown
is used when:
KafkaServer
is requested to shut down.
ControllerEventManager¶
KafkaController
creates a ControllerEventManager when created (with broker.id configuration property).
The ControllerEventManager
is used to create the following services:
- ControllerBrokerRequestBatch
- ZkReplicaStateMachine
- ZkPartitionStateMachine
- ControllerChangeHandler
- BrokerChangeHandler
- TopicChangeHandler
- TopicDeletionHandler
- PartitionReassignmentHandler
- PreferredReplicaElectionHandler
- IsrChangeNotificationHandler
- LogDirEventNotificationHandler
BrokerModificationsHandler
(in registerBrokerModificationsHandler)PartitionReassignmentIsrChangeHandler
(in updateCurrentReassignment)PartitionModificationsHandler
(in registerPartitionModificationsHandlers)
Processing Controller Events¶
process(
event: ControllerEvent): Unit
process
is part of the ControllerEventProcessor abstraction.
process
handles the input ControllerEvent and updates the metrics.
updateMetrics¶
updateMetrics(): Unit
updateMetrics
updates the metrics (using the ControllerContext).
ControllerMovedException¶
In case of a ControllerMovedException
, process
prints out the following INFO message to the logs and maybeResign.
Controller moved to another broker when processing [event].
Throwable¶
In case of any other error (Throwable
), process
simply prints out the following ERROR message to the logs:
Error processing event [event]
PartitionStateMachine¶
KafkaController
creates a ZkPartitionStateMachine when created with the following:
KafkaController
uses this ZkPartitionStateMachine
to create the TopicDeletionManager.
ZkPartitionStateMachine
is requested to start up at onControllerFailover (when a broker is successfully elected as the controller) and shut down at controller resignation.
ZkPartitionStateMachine
is requested to triggerOnlinePartitionStateChange at the following events:
- onBrokerStartup
- onReplicasBecomeOffline
- processUncleanLeaderElectionEnable
- processTopicUncleanLeaderElectionEnable
ZkPartitionStateMachine
is requested to handleStateChanges at the following events:
- onReplicasBecomeOffline
- onNewPartitionCreation
- onReplicaElection
- moveReassignedPartitionLeaderIfRequired
- doControlledShutdown
enableTopicUncleanLeaderElection¶
enableTopicUncleanLeaderElection(
topic: String): Unit
enableTopicUncleanLeaderElection
does nothing on an inactive controller.
enableTopicUncleanLeaderElection
requests the ControllerEventManager to enqueue a TopicUncleanLeaderElectionEnable event.
enableTopicUncleanLeaderElection
is used when:
TopicConfigHandler
is requested to processConfigChanges
processTopicUncleanLeaderElectionEnable¶
processTopicUncleanLeaderElectionEnable(
topic: String): Unit
processTopicUncleanLeaderElectionEnable
is only executed on an active controller broker and does nothing otherwise.
processTopicUncleanLeaderElectionEnable
prints out the following INFO message to the logs:
Unclean leader election has been enabled for topic [topic]
In the end, processTopicUncleanLeaderElectionEnable
requests the ZkPartitionStateMachine to triggerOnlinePartitionStateChange.
processTopicUncleanLeaderElectionEnable
is used when:
KafkaController
is requested to process a TopicUncleanLeaderElectionEnable event
Logging¶
Enable ALL
logging level for kafka.controller.KafkaController
logger to see what happens inside.
Add the following line to config/log4j.properties
:
log4j.logger.kafka.controller.KafkaController=ALL
Refer to Logging
Note
Please note that Kafka comes with a preconfigured kafka.controller
logger in config/log4j.properties
:
log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false
That means that the logs of KafkaController
go to logs/controller.log
file at TRACE
logging level and are not added to the main logs (per log4j.additivity
being off)
logIdent¶
KafkaController
uses the following logging prefix (with the broker.id):
[Controller id=[brokerId]]
Review Me¶
[[state]] KafkaController
is in one of the <
KafkaController
uses the <
Unsubscribing from Child Changes to /isr_change_notification ZNode¶
unregisterZNodeChildChangeHandler(): Unit
unregisterZNodeChildChangeHandler
prints out the following DEBUG message to the logs:
De-registering IsrChangeNotificationListener
unregisterZNodeChildChangeHandler
requests </isr_change_notification
znode with <
unregisterZNodeChildChangeHandler
is used when:
KafkaController
is requested to resign as the active controller
Unsubscribing from Child Changes to /log_dir_event_notification ZNode¶
deregisterLogDirEventNotificationListener(): Unit
deregisterLogDirEventNotificationListener
prints out the following DEBUG message to the logs:
De-registering logDirEventNotificationListener
deregisterLogDirEventNotificationListener
requests </log_dir_event_notification
znode with <
deregisterLogDirEventNotificationListener
is used when:
KafkaController
is requested to resign as the active controller
Unsubscribing from Data Changes to /admin/preferred_replica_election ZNode¶
deregisterPreferredReplicaElectionListener(): Unit
deregisterPreferredReplicaElectionListener
requests </admin/preferred_replica_election
znode with <
deregisterPreferredReplicaElectionListener
is used when:
KafkaController
is requested to resign as the active controller
Unsubscribing from Data Changes to /admin/reassign_partitions ZNode¶
deregisterPartitionReassignmentListener(): Unit
deregisterPartitionReassignmentListener
requests </admin/reassign_partitions
znode with <
deregisterPartitionReassignmentListener
is used when:
KafkaController
is requested to resign as the active controller
sendUpdateMetadataRequest¶
sendUpdateMetadataRequest(): Unit
sendUpdateMetadataRequest
requests the <
In the end, sendUpdateMetadataRequest
requests the <
In case of IllegalStateException
, sendUpdateMetadataRequest
<
sendUpdateMetadataRequest
is used when:
-
KafkaController
is requested to <>, < >, < >, < >, < >, process a < > controller event -
TopicDeletionManager
is requested to <>
updateLeaderEpochAndSendRequest¶
updateLeaderEpochAndSendRequest(
partition: TopicPartition,
replicasToReceiveRequest: Seq[Int],
newAssignedReplicas: Seq[Int]): Unit
[[updateLeaderEpochAndSendRequest-updateLeaderEpoch]] updateLeaderEpochAndSendRequest
<
NOTE: updateLeaderEpochAndSendRequest
is used when KafkaController
is requested to <
LeaderIsrAndControllerEpoch¶
When <LeaderIsrAndControllerEpoch
, updateLeaderEpochAndSendRequest
requests the <updateLeaderEpochAndSendRequest
requests the <
In the end, updateLeaderEpochAndSendRequest
prints out the following TRACE message to the logs:
Sent LeaderAndIsr request [updatedLeaderIsrAndControllerEpoch] with new assigned replica list [newAssignedReplicas] to leader [leader] for partition being reassigned [partition]
No LeaderIsrAndControllerEpoch¶
When <None
, updateLeaderEpochAndSendRequest
prints out the following ERROR message to the logs:
Failed to send LeaderAndIsr request with new assigned replica list [newAssignedReplicas] to leader for partition being reassigned [partition]
=== [[elect]] Controller Election
elect(): Unit
elect
requests the <-1
if not available) and saves it to the <
elect
stops the controller election if there is an active controller ID available and prints out the following DEBUG message to the logs:
Broker [activeControllerId] has been elected as the controller, so stopping the election process.
[[elect-registerControllerAndIncrementControllerEpoch]] Otherwise, with no active controller, elect
requests the <
elect
saves the controller epoch and the zookeeper epoch as the <
elect
saves the <
elect
prints out the following INFO message to the logs:
[brokerId] successfully elected as the controller. Epoch incremented to [epoch] and epoch zk version is now [epochZkVersion]
In the end, elect
<
NOTE: elect
is used when ControllerEventThread
is requested to process <
==== [[elect-ControllerMovedException]] elect
and ControllerMovedException
In case of a ControllerMovedException
, elect
<
Broker [activeControllerId] was elected as controller instead of broker [brokerId]
A controller has been elected but just resigned, this will result in another round of election
=== [[isActive]] Is Broker The Active Controller?
isActive: Boolean
isActive
indicates whether the current broker (by the broker ID) hosts the active KafkaController
(given the <
NOTE: isActive
is on (true
) after the KafkaController
of a Kafka broker has been <
[NOTE]¶
isActive
is used (as a valve to stop processing early) when:
-
ControllerEventThread
is requested to <> (that should only be processed on the active controller, e.g. AutoPreferredReplicaLeaderElection
,UncleanLeaderElectionEnable
,ControlledShutdown
,LeaderAndIsrResponseReceived
,TopicDeletionStopReplicaResponseReceived
,BrokerChange
,BrokerModifications
,TopicChange
) -
KafkaController
is requested to <>
* KafkaApis
is requested to <>, <> and <>¶
=== [[startup]] Starting Up
[source, scala]¶
startup(): Unit¶
startup
requests the <
-
On
afterInitializingSession
, theStateChangeHandler
simply putsRegisterBrokerAndReelect
event on the event queue of the <> -
On
beforeInitializingSession
, theStateChangeHandler
simply putsExpire
event on the event queue of the <>
startup
then puts Startup
event at the end of the event queue of the <
NOTE: startup
is used exclusively when KafkaServer
is requested to <
=== [[registerSessionExpirationListener]] Registering SessionExpirationListener To Control Session Recreation -- registerSessionExpirationListener
Internal Method
[source, scala]¶
registerSessionExpirationListener(): Unit¶
registerSessionExpirationListener
requests <SessionExpirationListener
(with the KafkaController
and <
NOTE: SessionExpirationListener
puts <ControllerEventManager
every time the Zookeeper session has expired and a new session has been created.
NOTE: registerSessionExpirationListener
is used exclusively when <ControllerEventThread
is link:kafka-controller-ControllerEventThread.md#doWork[started]).
=== [[registerControllerChangeListener]] Registering ControllerChangeListener for /controller ZNode Changes -- registerControllerChangeListener
Internal Method
[source, scala]¶
registerControllerChangeListener(): Unit¶
registerControllerChangeListener
requests </controller
znode with a ControllerChangeListener
(with the KafkaController
and <
[NOTE]¶
ControllerChangeListener
emits:
- <
> event with the current controller ID (on the link:kafka-controller-ControllerEventManager.md#queue[event queue] of ControllerEventManager
) every time the data of a znode changes
1. <> event when the data associated with a znode has been deleted¶
NOTE: registerControllerChangeListener
is used exclusively when <ControllerEventThread
is link:kafka-controller-ControllerEventThread.md#doWork[started]).
=== [[registerBrokerChangeListener]] registerBrokerChangeListener
Internal Method
[source, scala]¶
registerBrokerChangeListener(): Option[Seq[String]]¶
registerBrokerChangeListener
requests </brokers/ids
path with <
NOTE: registerBrokerChangeListener
is used exclusively when KafkaController
does <
=== [[getControllerID]] Getting Active Controller ID (from JSON under /controller znode) -- getControllerID
Method
[source, scala]¶
getControllerID(): Int¶
getControllerID
returns the ID of the active Kafka controller that is associated with /controller
znode in JSON format or -1
otherwise.
Internally, getControllerID
requests </controller
znode].
If available, getControllerID
parses the data (being the current controller info in JSON format) to extract brokerid
field.
$ ./bin/zookeeper-shell.sh :2181 get /controller
{"version":1,"brokerid":0,"timestamp":"1543499076007"}
cZxid = 0x60
ctime = Thu Nov 29 14:44:36 CET 2018
mZxid = 0x60
mtime = Thu Nov 29 14:44:36 CET 2018
pZxid = 0x60
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100073f07ba0003
dataLength = 54
numChildren = 0
Otherwise, when no /controller
znode is available, getControllerID
returns -1
.
[NOTE]¶
getControllerID
is used when:
- Processing
Reelect
controller event
1. <>¶
=== [[registerTopicDeletionListener]] Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode -- registerTopicDeletionListener
Internal Method
[source, scala]¶
registerTopicDeletionListener(): Option[Seq[String]]¶
registerTopicDeletionListener
requests </admin/delete_topics
znode with <
NOTE: registerTopicDeletionListener
is used exclusively when KafkaController
does <
=== [[deregisterTopicDeletionListener]] De-Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode -- deregisterTopicDeletionListener
Internal Method
[source, scala]¶
deregisterTopicDeletionListener(): Unit¶
deregisterTopicDeletionListener
requests </admin/delete_topics
znode with <
NOTE: deregisterTopicDeletionListener
is used exclusively when KafkaController
<
=== [[initializeControllerContext]] Initializing ControllerContext -- initializeControllerContext
Internal Method
[source, scala]¶
initializeControllerContext(): Unit¶
initializeControllerContext
...FIXME
In the end, initializeControllerContext
prints out the following INFO messages to the logs (with the current state based on the <
[options="wrap"]¶
Currently active brokers in the cluster: [liveBrokerIds] Currently shutting brokers in the cluster: [shuttingDownBrokerIds] Current list of topics in the cluster: [allTopics]
NOTE: initializeControllerContext
is used exclusively when KafkaController
is requested to <
=== [[updateLeaderAndIsrCache]] updateLeaderAndIsrCache
Internal Method
[source, scala]¶
updateLeaderAndIsrCache(partitions: Seq[TopicPartition]¶
Unless given, updateLeaderAndIsrCache
defaults to <
updateLeaderAndIsrCache
requests the <
NOTE: updateLeaderAndIsrCache
is used when KafkaController
is requested to <
=== [[onPreferredReplicaElection]] Preferred Replica Leader Election -- onPreferredReplicaElection
Internal Method
[source, scala]¶
onPreferredReplicaElection( partitions: Set[TopicPartition], electionType: ElectionType): Map[TopicPartition, Throwable]
onPreferredReplicaElection
prints out the following INFO message to the logs:
Starting preferred replica leader election for partitions [partitions]
onPreferredReplicaElection
requests the <OnlinePartition
target state and <
(only for <onPreferredReplicaElection
<
(only for <
[NOTE]¶
onPreferredReplicaElection
is used when KafkaController
is requested for the following:
-
<
> (with < > election type) -
<
> (with < > election type)
* <> (any election type with <> the default)¶
=== [[onControllerFailover]] onControllerFailover
Internal Method
[source, scala]¶
onControllerFailover(): Unit¶
onControllerFailover
prints out the following INFO message to the logs:
Registering handlers
onControllerFailover
requests the <
- <
> - <
> - <
> - <
> - <
>
onControllerFailover
requests the <
- <
> - <
>
onControllerFailover
prints out the following INFO message to the logs:
Deleting log dir event notifications
onControllerFailover
requests the <
onControllerFailover
prints out the following INFO message to the logs:
Deleting isr change notifications
onControllerFailover
requests the <
onControllerFailover
prints out the following INFO message to the logs:
Initializing controller context
onControllerFailover
<
onControllerFailover
prints out the following INFO message to the logs:
Fetching topic deletions in progress
onControllerFailover
<
onControllerFailover
prints out the following INFO message to the logs:
Initializing topic deletion manager
onControllerFailover
requests the <
onControllerFailover
prints out the following INFO message to the logs:
Sending update metadata request
onControllerFailover
<
onControllerFailover
requests the <
onControllerFailover
requests the <
onControllerFailover
prints out the following INFO message to the logs:
Ready to serve as the new controller with epoch [epoch]
onControllerFailover
<
onControllerFailover
requests the <
onControllerFailover
<
onControllerFailover
prints out the following INFO message to the logs:
Starting the controller scheduler
onControllerFailover
requests the <
With <true
), onControllerFailover
<
With <(empty)
), onControllerFailover
prints out the following INFO message to the logs:
starting the token expiry check scheduler
onControllerFailover
requests the <
NOTE: onControllerFailover
is used when KafkaController
is requested to <
=== [[scheduleAutoLeaderRebalanceTask]] scheduleAutoLeaderRebalanceTask
Internal Method
[source, scala]¶
scheduleAutoLeaderRebalanceTask( delay: Long, unit: TimeUnit): Unit
scheduleAutoLeaderRebalanceTask
simply requests the <
The auto-leader-rebalance-task
simply requests the <
NOTE: scheduleAutoLeaderRebalanceTask
is used when KafkaController
is requested to <
=== [[processAutoPreferredReplicaLeaderElection]] processAutoPreferredReplicaLeaderElection
Internal Method
[source, scala]¶
processAutoPreferredReplicaLeaderElection(): Unit¶
NOTE: processAutoPreferredReplicaLeaderElection
does nothing (and simply returns) unless the Kafka broker (KafkaController
) is an <
processAutoPreferredReplicaLeaderElection
prints out the following INFO message to the logs:
Processing automatic preferred replica leader election
processAutoPreferredReplicaLeaderElection
<
In the end, processAutoPreferredReplicaLeaderElection
<300
seconds).
NOTE: processAutoPreferredReplicaLeaderElection
is used exclusively when KafkaController
is requested to <
=== [[checkAndTriggerAutoLeaderRebalance]] checkAndTriggerAutoLeaderRebalance
Internal Method
[source, scala]¶
checkAndTriggerAutoLeaderRebalance(): Unit¶
checkAndTriggerAutoLeaderRebalance
prints out the following TRACE message to the logs:
Checking need to trigger auto leader balancing
[[checkAndTriggerAutoLeaderRebalance-preferredReplicasForTopicsByBrokers]] checkAndTriggerAutoLeaderRebalance
...FIXME
checkAndTriggerAutoLeaderRebalance
prints out the following DEBUG message to the logs:
Preferred replicas by broker [preferredReplicasForTopicsByBrokers]
For every broker with one or more partition leaders, checkAndTriggerAutoLeaderRebalance
...FIXME
checkAndTriggerAutoLeaderRebalance
prints out the following DEBUG message to the logs:
Topics not in preferred replica for broker [leaderBroker] [topicsNotInPreferredReplica]
[[checkAndTriggerAutoLeaderRebalance-imbalanceRatio]] checkAndTriggerAutoLeaderRebalance
calculates an imbalance ratio of the broker which is the number of topicsNotInPreferredReplica
divided by the total number of partitions (topicPartitionsForBroker
).
checkAndTriggerAutoLeaderRebalance
prints out the following TRACE message to the logs:
Leader imbalance ratio for broker [leaderBroker] is [imbalanceRatio]
[[checkAndTriggerAutoLeaderRebalance-candidatePartitions]] With the imbalance ratio greater than the desired ratio (per <10%
), checkAndTriggerAutoLeaderRebalance
<
NOTE: checkAndTriggerAutoLeaderRebalance
is used exclusively when KafkaController
is requested to <
=== [[onBrokerLogDirFailure]] Handling Log Directory Failures for Brokers -- onBrokerLogDirFailure
Internal Method
[source, scala]¶
onBrokerLogDirFailure( brokerIds: Seq[Int]): Unit
onBrokerLogDirFailure
prints out the following INFO message to the logs:
Handling log directory failure for brokers [brokerIds]
onBrokerLogDirFailure
requests the <OnlineReplica
state.
NOTE: onBrokerLogDirFailure
is used exclusively when KafkaController
is requested to <
=== [[enableDefaultUncleanLeaderElection]] enableDefaultUncleanLeaderElection
Method
[source, scala]¶
enableDefaultUncleanLeaderElection(): Unit¶
NOTE: enableDefaultUncleanLeaderElection
does nothing (and simply returns) unless the Kafka broker (KafkaController
) is an <
enableDefaultUncleanLeaderElection
simply requests the <
NOTE: enableDefaultUncleanLeaderElection
is used when DynamicLogConfig
is requested to link:kafka-server-DynamicLogConfig.md#reconfigure[reconfigure] (for link:kafka-log-LogConfig.md#unclean.leader.election.enable[unclean.leader.election.enable] configuration property).
=== [[electPreferredLeaders]] Preferred Replica Leader Election -- electPreferredLeaders
Method
[source, scala]¶
electPreferredLeaders( partitions: Set[TopicPartition], callback: ElectPreferredLeadersCallback = { (_, _) => }): Unit
electPreferredLeaders
simply requests the <
NOTE: electPreferredLeaders
is used exclusively when ReplicaManager
is requested to <
=== [[processUncleanLeaderElectionEnable]] processUncleanLeaderElectionEnable
Internal Method
[source, scala]¶
processUncleanLeaderElectionEnable(): Unit¶
NOTE: processUncleanLeaderElectionEnable
does nothing (and simply returns) unless the Kafka broker (KafkaController
) is an <
processUncleanLeaderElectionEnable
prints out the following INFO message to the logs:
Unclean leader election has been enabled by default
processUncleanLeaderElectionEnable
requests the <
NOTE: processUncleanLeaderElectionEnable
is used exclusively when KafkaController
is requested to <
=== [[processBrokerChange]] Processing BrokerChange Controller Event (On controller-event-thread) -- processBrokerChange
Internal Method
[source, scala]¶
processBrokerChange(): Unit¶
NOTE: processBrokerChange
does nothing (and simply returns) unless the Kafka broker (KafkaController
) is an <
processBrokerChange
requests the <
At this point in time, processBrokerChange
knows what brokers are new, dead or bounced.
processBrokerChange
prints out the following INFO message to the logs:
[options="wrap"]¶
Newly added brokers: [ids], deleted brokers: [ids], bounced brokers: [ids], all live brokers: [ids]¶
processBrokerChange
notifies (updates) the <
-
For every newly-added broker,
processBrokerChange
requests to <> -
For bounced brokers,
processBrokerChange
requests to <> first followed by < > -
For every deleted broker,
processBrokerChange
requests to <>
processBrokerChange
updates the <
-
For newly-added brokers (if there were any),
processBrokerChange
requests to <> followed by < > -
For bounced brokers (if there were any),
processBrokerChange
first requests to <> followed by < > and then requests to < > followed by < > -
For deleted brokers (if there were any),
processBrokerChange
requests to <> followed by < >
In the end, only when they were any updates (new, dead or bounced brokers), processBrokerChange
prints out the following INFO message to the logs:
Updated broker epochs cache: [liveBrokerIdAndEpochs]
NOTE: processBrokerChange
is used exclusively when KafkaController
is requested to <
=== [[processLogDirEventNotification]] Processing LogDirEventNotification Controller Event (On controller-event-thread) -- processLogDirEventNotification
Internal Method
[source, scala]¶
processLogDirEventNotification(): Unit¶
NOTE: processLogDirEventNotification
does nothing (and simply returns) unless the Kafka broker (KafkaController
) is an <
processLogDirEventNotification
requests the <
processLogDirEventNotification
requests the <
In the end, processLogDirEventNotification
requests the <
NOTE: processLogDirEventNotification
is used exclusively when KafkaController
is requested to <
=== [[processStartup]] processStartup
Internal Method
[source, scala]¶
processStartup(): Unit¶
processStartup
requests the <
In the end, processStartup
starts <
NOTE: processStartup
is used exclusively when KafkaController
is requested to <
=== [[internal-properties]] Internal Properties
| activeControllerId a| [[activeControllerId]] The ID of the active KafkaController
- Initialized to
-1
| brokerRequestBatch a| [[brokerRequestBatch]] <
| controllerChangeHandler a| [[controllerChangeHandler]] A ZNodeChangeHandler
(for the KafkaController
and the </controller
znode.
controllerChangeHandler
<
-
ControllerChange
when the znode is created or the znode data changed -
Reelect
when the znode is deleted
| eventManager a| [[eventManager]] <
eventManager
is used to create other internal components to allow them for emitting controller events at state changes:
- <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
>
eventManager
is <KafkaController
is requested to <
eventManager
is <KafkaController
is requested to <
| kafkaScheduler | [[kafkaScheduler]] <
| stateChangeLogger | [[stateChangeLogger]] link:kafka-controller-StateChangeLogger.md[StateChangeLogger] with the <inControllerContext
flag enabled
| tokenCleanScheduler | [[tokenCleanScheduler]] <
| topicDeletionManager | [[topicDeletionManager]] <