Skip to content

KafkaController

KafkaController is created and immediately started alongside KafkaServer.

KafkaController

KafkaController uses listeners as a notification system to react to changes in Zookeeper.

KafkaController is a state machine (using controller events).

Creating Instance

KafkaController takes the following to be created:

  • KafkaConfig
  • KafkaZkClient
  • Time
  • Metrics
  • BrokerInfo
  • initialBrokerEpoch
  • DelegationTokenManager
  • BrokerFeatures
  • FinalizedFeatureCache
  • Thread name prefix (default: undefined)

KafkaController is created when:

ControllerContext

KafkaController creates a ControllerContext when created.

ControllerChannelManager

KafkaController creates a ControllerChannelManager when created.

ControllerChannelManager is used to create separate ControllerBrokerRequestBatches of the KafkaController itself, the ZkReplicaStateMachine and ZkPartitionStateMachine.

ControllerChannelManager is requested to start up when KafkaController is requested to start controller election (and a broker is successfully elected as the active controller).

KafkaController uses the ControllerChannelManager to add or remove brokers when processing broker changes in Zookeeper (a new or updated znode under /brokers/ids path).

ControllerChannelManager is requested to shut down when KafkaController is requested to resign as the active controller.

Performance Metrics

KafkaController is a KafkaMetricsGroup.

ActiveControllerCount

1 if isActive. 0 otherwise.

OfflinePartitionsCount

PreferredReplicaImbalanceCount

ControllerState

GlobalTopicCount

GlobalPartitionCount

TopicsToDeleteCount

ReplicasToDeleteCount

TopicsIneligibleToDeleteCount

ReplicasIneligibleToDeleteCount

ActiveBrokerCount

FencedBrokerCount

ControllerEventProcessor

KafkaController is a ControllerEventProcessor to process and preempt controller events.

Resigning As Active Controller

onControllerResignation(): Unit

onControllerResignation starts by printing out the following DEBUG message to the logs:

Resigning

onControllerResignation unsubscribes from intercepting Zookeeper events for the following znodes in order:

  1. Child changes to /isr_change_notification znode
  2. Data changes to /admin/reassign_partitions znode
  3. Data changes to /admin/preferred_replica_election znode
  4. Child changes to /log_dir_event_notification znode

onControllerResignation unregisterBrokerModificationsHandler.

onControllerResignation requests KafkaScheduler to shutdown.

onControllerResignation resets internal counters.

onControllerResignation unregisterPartitionReassignmentIsrChangeHandlers.

onControllerResignation requests the PartitionStateMachine to shutdown.

onControllerResignation unsubscribes from intercepting Zookeeper events for the following znode:

onControllerResignation unregisterPartitionModificationsHandlers.

onControllerResignation unsubscribes from intercepting Zookeeper events for the following znode:

onControllerResignation requests the ReplicaStateMachine to shutdown.

onControllerResignation unsubscribes from intercepting Zookeeper events for the following znode:

onControllerResignation requests the ControllerChannelManager to shutdown.

onControllerResignation requests the ControllerContext to resetContext.

In the end, onControllerResignation prints out the following DEBUG message to the logs:

Resigned

onControllerResignation is used when:

Processing Expire Event

processExpire(): Unit

processExpire sets the activeControllerId to -1 followed by onControllerResignation.


processExpire is used when:

onBrokerStartup

onBrokerStartup(
  newBrokers: Seq[Int]): Unit

onBrokerStartup prints out the following INFO message to the logs:

New broker startup callback for [comma-separated list of newBrokers]

onBrokerStartup removes the newBrokers from the replicasOnOfflineDirs of the ControllerContext.

onBrokerStartup sends update metadata request to all the existing brokers in the cluster (of the ControllerContext).

onBrokerStartup sends update metadata request to the new brokers with the full set of partition states (for initialization).

onBrokerStartup requests the ControllerContext for the replicas on the given newBrokers (the entire list of partitions that it is supposed to host).

onBrokerStartup requests the ReplicaStateMachine to handleStateChanges with the replicas on the new brokers and OnlineReplica target state.

onBrokerStartup requests the PartitionStateMachine to triggerOnlinePartitionStateChange.

onBrokerStartup checks if topic deletion can be resumed. onBrokerStartup collects replicas (on the new brokers) that are scheduled to be deleted by requesting the TopicDeletionManager to isTopicQueuedUpForDeletion. If there are any, onBrokerStartup prints out the following INFO message to the logs and requests the TopicDeletionManager to resumeDeletionForTopics.

Some replicas [replicasForTopicsToBeDeleted] for topics scheduled for deletion [topicsToBeDeleted]
are on the newly restarted brokers [newBrokers].
Signaling restart of topic deletion for these topics

In the end, onBrokerStartup registerBrokerModificationsHandler with the new brokers.


onBrokerStartup is used when:

ReplicaStateMachine

KafkaController creates a ZkReplicaStateMachine when created.

ZkReplicaStateMachine is requested to <> at <> (when a broker is successfully <>) and <> at <>.

ZkReplicaStateMachine is requested to <> at the following events:

  • <> to transition replicas to OnlineReplica state

  • <> to transition replicas to OnlineReplica state

  • <> to transition replicas to OfflineReplica state

  • <> to transition replicas to NewReplica state first and then to OnlineReplica

  • <> to transition replicas to OnlineReplica state

  • <> to transition replicas to OfflineReplica state first and then to ReplicaDeletionStarted, ReplicaDeletionSuccessful, and NonExistentReplica in the end

  • <> to transition replicas to NewReplica state

  • <> to transition replicas to OfflineReplica state

KafkaController uses the ZkReplicaStateMachine to create the <>.

Shutting Down

shutdown(): Unit

shutdown requests the ControllerEventManager to close followed by onControllerResignation.


shutdown is used when:

ControllerEventManager

KafkaController creates a ControllerEventManager when created (with broker.id configuration property).

The ControllerEventManager is used to create the following services:

Processing Controller Events

process(
  event: ControllerEvent): Unit

process is part of the ControllerEventProcessor abstraction.


process handles the input ControllerEvent and updates the metrics.

ControllerEvent Handler
AllocateProducerIds processAllocateProducerIds
AlterPartitionReceived processAlterPartition
ApiPartitionReassignment processApiPartitionReassignment
AutoPreferredReplicaLeaderElection processAutoPreferredReplicaLeaderElection
BrokerChange processBrokerChange
BrokerModifications processBrokerModification
ControlledShutdown processControlledShutdown
ControllerChange processControllerChange
Expire processExpire
IsrChangeNotification processIsrChangeNotification
LeaderAndIsrResponseReceived processLeaderAndIsrResponseReceived
ListPartitionReassignments processListPartitionReassignments
LogDirEventNotification processLogDirEventNotification
PartitionModifications processPartitionModifications
PartitionReassignmentIsrChange processPartitionReassignmentIsrChange
Reelect processReelect
RegisterBrokerAndReelect processRegisterBrokerAndReelect
ReplicaLeaderElection processReplicaLeaderElection
Startup processStartup
TopicChange processTopicChange
TopicDeletion processTopicDeletion
TopicDeletionStopReplicaResponseReceived processTopicDeletionStopReplicaResponseReceived
TopicUncleanLeaderElectionEnable processTopicUncleanLeaderElectionEnable
UncleanLeaderElectionEnable processUncleanLeaderElectionEnable
UpdateFeatures processFeatureUpdates
UpdateMetadataResponseReceived processUpdateMetadataResponseReceived
ZkPartitionReassignment processZkPartitionReassignment

updateMetrics

updateMetrics(): Unit

updateMetrics updates the metrics (using the ControllerContext).

ControllerMovedException

In case of a ControllerMovedException, process prints out the following INFO message to the logs and maybeResign.

Controller moved to another broker when processing [event].

Throwable

In case of any other error (Throwable), process simply prints out the following ERROR message to the logs:

Error processing event [event]

PartitionStateMachine

KafkaController creates a ZkPartitionStateMachine when created with the following:

KafkaController uses this ZkPartitionStateMachine to create the TopicDeletionManager.

ZkPartitionStateMachine is requested to start up at onControllerFailover (when a broker is successfully elected as the controller) and shut down at controller resignation.

ZkPartitionStateMachine is requested to triggerOnlinePartitionStateChange at the following events:

ZkPartitionStateMachine is requested to handleStateChanges at the following events:

enableTopicUncleanLeaderElection

enableTopicUncleanLeaderElection(
  topic: String): Unit

enableTopicUncleanLeaderElection does nothing on an inactive controller.

enableTopicUncleanLeaderElection requests the ControllerEventManager to enqueue a TopicUncleanLeaderElectionEnable event.


enableTopicUncleanLeaderElection is used when:

processTopicUncleanLeaderElectionEnable

processTopicUncleanLeaderElectionEnable(
  topic: String): Unit

processTopicUncleanLeaderElectionEnable is only executed on an active controller broker and does nothing otherwise.

processTopicUncleanLeaderElectionEnable prints out the following INFO message to the logs:

Unclean leader election has been enabled for topic [topic]

In the end, processTopicUncleanLeaderElectionEnable requests the ZkPartitionStateMachine to triggerOnlinePartitionStateChange.


processTopicUncleanLeaderElectionEnable is used when:

Logging

Enable ALL logging level for kafka.controller.KafkaController logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.controller.KafkaController=ALL

Refer to Logging

Note

Please note that Kafka comes with a preconfigured kafka.controller logger in config/log4j.properties:

log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

That means that the logs of KafkaController go to logs/controller.log file at TRACE logging level and are not added to the main logs (per log4j.additivity being off)

logIdent

KafkaController uses the following logging prefix (with the broker.id):

[Controller id=[brokerId]]

Review Me

[[state]] KafkaController is in one of the <> (that is the <> of the <>).

KafkaController uses the <> to be notified about changes in the state of a Kafka cluster (that are reflected in changes in znodes of Apache Zookeeper) and propagate the state changes to other brokers.

Unsubscribing from Child Changes to /isr_change_notification ZNode

unregisterZNodeChildChangeHandler(): Unit

unregisterZNodeChildChangeHandler prints out the following DEBUG message to the logs:

De-registering IsrChangeNotificationListener

unregisterZNodeChildChangeHandler requests <> to link:kafka-ZkUtils.md#unsubscribeChildChanges[unsubscribe from intercepting changes] to /isr_change_notification znode with <>.


unregisterZNodeChildChangeHandler is used when:

Unsubscribing from Child Changes to /log_dir_event_notification ZNode

deregisterLogDirEventNotificationListener(): Unit

deregisterLogDirEventNotificationListener prints out the following DEBUG message to the logs:

De-registering logDirEventNotificationListener

deregisterLogDirEventNotificationListener requests <> to link:kafka-ZkUtils.md#unsubscribeChildChanges[unsubscribe from intercepting changes] to /log_dir_event_notification znode with <>.


deregisterLogDirEventNotificationListener is used when:

Unsubscribing from Data Changes to /admin/preferred_replica_election ZNode

deregisterPreferredReplicaElectionListener(): Unit

deregisterPreferredReplicaElectionListener requests <> to link:kafka-ZkUtils.md#unsubscribeDataChanges[unsubscribe from intercepting data changes] to /admin/preferred_replica_election znode with <>.

deregisterPreferredReplicaElectionListener is used when:

Unsubscribing from Data Changes to /admin/reassign_partitions ZNode

deregisterPartitionReassignmentListener(): Unit

deregisterPartitionReassignmentListener requests <> to link:kafka-ZkUtils.md#unsubscribeDataChanges[unsubscribe from intercepting data changes] to /admin/reassign_partitions znode with <>.

deregisterPartitionReassignmentListener is used when:

sendUpdateMetadataRequest

sendUpdateMetadataRequest(): Unit

sendUpdateMetadataRequest requests the <> to <> and <>.

In the end, sendUpdateMetadataRequest requests the <> to <> with the current epoch.

In case of IllegalStateException, sendUpdateMetadataRequest <> (that <>).


sendUpdateMetadataRequest is used when:

  • KafkaController is requested to <>, <>, <>, <>, <>, process a <> controller event

  • TopicDeletionManager is requested to <>

updateLeaderEpochAndSendRequest

updateLeaderEpochAndSendRequest(
  partition: TopicPartition,
  replicasToReceiveRequest: Seq[Int],
  newAssignedReplicas: Seq[Int]): Unit

[[updateLeaderEpochAndSendRequest-updateLeaderEpoch]] updateLeaderEpochAndSendRequest <> and branches off per result: a <> or <>.

NOTE: updateLeaderEpochAndSendRequest is used when KafkaController is requested to <> and <>.

LeaderIsrAndControllerEpoch

When <> returns a LeaderIsrAndControllerEpoch, updateLeaderEpochAndSendRequest requests the <> to <>. updateLeaderEpochAndSendRequest requests the <> to <> followed by <>.

In the end, updateLeaderEpochAndSendRequest prints out the following TRACE message to the logs:

Sent LeaderAndIsr request [updatedLeaderIsrAndControllerEpoch] with new assigned replica list [newAssignedReplicas] to leader [leader] for partition being reassigned [partition]

No LeaderIsrAndControllerEpoch

When <> returns None, updateLeaderEpochAndSendRequest prints out the following ERROR message to the logs:

Failed to send LeaderAndIsr request with new assigned replica list [newAssignedReplicas] to leader for partition being reassigned [partition]

=== [[elect]] Controller Election

elect(): Unit

elect requests the <> for the <> (or assumes -1 if not available) and saves it to the <> internal registry.

elect stops the controller election if there is an active controller ID available and prints out the following DEBUG message to the logs:

Broker [activeControllerId] has been elected as the controller, so stopping the election process.

[[elect-registerControllerAndIncrementControllerEpoch]] Otherwise, with no active controller, elect requests the <> to <> (with the <>).

elect saves the controller epoch and the zookeeper epoch as the <> and <> of the <>, respectively.

elect saves the <> as the <> internal registry.

elect prints out the following INFO message to the logs:

[brokerId] successfully elected as the controller. Epoch incremented to [epoch] and epoch zk version is now [epochZkVersion]

In the end, elect <>.

NOTE: elect is used when ControllerEventThread is requested to process <> and <> controller events.

==== [[elect-ControllerMovedException]] elect and ControllerMovedException

In case of a ControllerMovedException, elect <> and prints out either DEBUG or WARN message to the logs per the <> internal registry:

Broker [activeControllerId] was elected as controller instead of broker [brokerId]
A controller has been elected but just resigned, this will result in another round of election

=== [[isActive]] Is Broker The Active Controller?

isActive: Boolean

isActive indicates whether the current broker (by the broker ID) hosts the active KafkaController (given the <>) or not.

NOTE: isActive is on (true) after the KafkaController of a Kafka broker has been <>.

[NOTE]

isActive is used (as a valve to stop processing early) when:

  • ControllerEventThread is requested to <> (that should only be processed on the active controller, e.g. AutoPreferredReplicaLeaderElection, UncleanLeaderElectionEnable, ControlledShutdown, LeaderAndIsrResponseReceived, TopicDeletionStopReplicaResponseReceived, BrokerChange, BrokerModifications, TopicChange)

  • KafkaController is requested to <>

* KafkaApis is requested to <>, <> and <>

=== [[startup]] Starting Up

[source, scala]

startup(): Unit

startup requests the <> to <> (under the name controller-state-change-handler) that is does the following:

  • On afterInitializingSession, the StateChangeHandler simply puts RegisterBrokerAndReelect event on the event queue of the <>

  • On beforeInitializingSession, the StateChangeHandler simply puts Expire event on the event queue of the <>

startup then puts Startup event at the end of the event queue of the <> and immediately requests it to <>.

NOTE: startup is used exclusively when KafkaServer is requested to <>.

=== [[registerSessionExpirationListener]] Registering SessionExpirationListener To Control Session Recreation -- registerSessionExpirationListener Internal Method

[source, scala]

registerSessionExpirationListener(): Unit

registerSessionExpirationListener requests <> to link:kafka-ZkUtils.md#subscribeStateChanges[subscribe to state changes] with a SessionExpirationListener (with the KafkaController and <>).

NOTE: SessionExpirationListener puts <> event on the link:kafka-controller-ControllerEventManager.md#queue[event queue] of ControllerEventManager every time the Zookeeper session has expired and a new session has been created.

NOTE: registerSessionExpirationListener is used exclusively when <> event is processed (after ControllerEventThread is link:kafka-controller-ControllerEventThread.md#doWork[started]).

=== [[registerControllerChangeListener]] Registering ControllerChangeListener for /controller ZNode Changes -- registerControllerChangeListener Internal Method

[source, scala]

registerControllerChangeListener(): Unit

registerControllerChangeListener requests <> to link:kafka-ZkUtils.md#subscribeDataChanges[subscribe to data changes] for /controller znode with a ControllerChangeListener (with the KafkaController and <>).

[NOTE]

ControllerChangeListener emits:

  1. <> event with the current controller ID (on the link:kafka-controller-ControllerEventManager.md#queue[event queue] of ControllerEventManager) every time the data of a znode changes

1. <> event when the data associated with a znode has been deleted

NOTE: registerControllerChangeListener is used exclusively when <> event is processed (after ControllerEventThread is link:kafka-controller-ControllerEventThread.md#doWork[started]).

=== [[registerBrokerChangeListener]] registerBrokerChangeListener Internal Method

[source, scala]

registerBrokerChangeListener(): Option[Seq[String]]

registerBrokerChangeListener requests <> to link:kafka-ZkUtils.md#subscribeChildChanges[subscribeChildChanges] for /brokers/ids path with <>.

NOTE: registerBrokerChangeListener is used exclusively when KafkaController does <>.

=== [[getControllerID]] Getting Active Controller ID (from JSON under /controller znode) -- getControllerID Method

[source, scala]

getControllerID(): Int

getControllerID returns the ID of the active Kafka controller that is associated with /controller znode in JSON format or -1 otherwise.

Internally, getControllerID requests <> for link:kafka-ZkUtils.md#readDataMaybeNull[data associated with /controller znode].

If available, getControllerID parses the data (being the current controller info in JSON format) to extract brokerid field.

$ ./bin/zookeeper-shell.sh :2181 get /controller

{"version":1,"brokerid":0,"timestamp":"1543499076007"}
cZxid = 0x60
ctime = Thu Nov 29 14:44:36 CET 2018
mZxid = 0x60
mtime = Thu Nov 29 14:44:36 CET 2018
pZxid = 0x60
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100073f07ba0003
dataLength = 54
numChildren = 0

Otherwise, when no /controller znode is available, getControllerID returns -1.

[NOTE]

getControllerID is used when:

  1. Processing Reelect controller event

1. <>

=== [[registerTopicDeletionListener]] Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode -- registerTopicDeletionListener Internal Method

[source, scala]

registerTopicDeletionListener(): Option[Seq[String]]

registerTopicDeletionListener requests <> to link:kafka-ZkUtils.md#subscribeChildChanges[subscribeChildChanges] to /admin/delete_topics znode with <>.

NOTE: registerTopicDeletionListener is used exclusively when KafkaController does <>.

=== [[deregisterTopicDeletionListener]] De-Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode -- deregisterTopicDeletionListener Internal Method

[source, scala]

deregisterTopicDeletionListener(): Unit

deregisterTopicDeletionListener requests <> to link:kafka-ZkUtils.md#unsubscribeChildChanges[unsubscribeChildChanges] to /admin/delete_topics znode with <>.

NOTE: deregisterTopicDeletionListener is used exclusively when KafkaController <>.

=== [[initializeControllerContext]] Initializing ControllerContext -- initializeControllerContext Internal Method

[source, scala]

initializeControllerContext(): Unit

initializeControllerContext...FIXME

In the end, initializeControllerContext prints out the following INFO messages to the logs (with the current state based on the <>):

[options="wrap"]

Currently active brokers in the cluster: [liveBrokerIds] Currently shutting brokers in the cluster: [shuttingDownBrokerIds] Current list of topics in the cluster: [allTopics]


NOTE: initializeControllerContext is used exclusively when KafkaController is requested to <>.

=== [[updateLeaderAndIsrCache]] updateLeaderAndIsrCache Internal Method

[source, scala]

updateLeaderAndIsrCache(partitions: Seq[TopicPartition]

Unless given, updateLeaderAndIsrCache defaults to <> of the <> for the partitions.

updateLeaderAndIsrCache requests the <> to <> (with the given partitions) and updates the <> of the <>.

NOTE: updateLeaderAndIsrCache is used when KafkaController is requested to <> (with no partitions) and <> (with partitions given).

=== [[onPreferredReplicaElection]] Preferred Replica Leader Election -- onPreferredReplicaElection Internal Method

[source, scala]

onPreferredReplicaElection( partitions: Set[TopicPartition], electionType: ElectionType): Map[TopicPartition, Throwable]


onPreferredReplicaElection prints out the following INFO message to the logs:

Starting preferred replica leader election for partitions [partitions]

onPreferredReplicaElection requests the <> to <> for the partitions (with OnlinePartition target state and <>).

(only for <> that are not <>) In the end, onPreferredReplicaElection <>.

(only for <> that are not <>) In case of an error

[NOTE]

onPreferredReplicaElection is used when KafkaController is requested for the following:

  • <> (with <> election type)

  • <> (with <> election type)

* <> (any election type with <> the default)

=== [[onControllerFailover]] onControllerFailover Internal Method

[source, scala]

onControllerFailover(): Unit

onControllerFailover prints out the following INFO message to the logs:

Registering handlers

onControllerFailover requests the <> to <>:

  • <>
  • <>
  • <>
  • <>
  • <>

onControllerFailover requests the <> to <>:

  • <>
  • <>

onControllerFailover prints out the following INFO message to the logs:

Deleting log dir event notifications

onControllerFailover requests the <> to <> (with the <> of the <>).

onControllerFailover prints out the following INFO message to the logs:

Deleting isr change notifications

onControllerFailover requests the <> to <> (with the <> of the <>).

onControllerFailover prints out the following INFO message to the logs:

Initializing controller context

onControllerFailover <>.

onControllerFailover prints out the following INFO message to the logs:

Fetching topic deletions in progress

onControllerFailover <>.

onControllerFailover prints out the following INFO message to the logs:

Initializing topic deletion manager

onControllerFailover requests the <> to <> (with the topics to be deleted and ineligible for deletion).

onControllerFailover prints out the following INFO message to the logs:

Sending update metadata request

onControllerFailover <> (with the <> of the <>).

onControllerFailover requests the <> to <>.

onControllerFailover requests the <> to <>.

onControllerFailover prints out the following INFO message to the logs:

Ready to serve as the new controller with epoch [epoch]

onControllerFailover <> (with the <> of the <>).

onControllerFailover requests the <> to <>.

onControllerFailover <> with the <>.

onControllerFailover prints out the following INFO message to the logs:

Starting the controller scheduler

onControllerFailover requests the <> to <>.

With <> enabled (default: true), onControllerFailover <> with the initial delay of 5 seconds.

With <> password set (default: (empty)), onControllerFailover prints out the following INFO message to the logs:

starting the token expiry check scheduler

onControllerFailover requests the <> to <> and requests it to <> the delete-expired-tokens task (FIXME).

NOTE: onControllerFailover is used when KafkaController is requested to <> (and a broker is successfully elected as the active controller).

=== [[scheduleAutoLeaderRebalanceTask]] scheduleAutoLeaderRebalanceTask Internal Method

[source, scala]

scheduleAutoLeaderRebalanceTask( delay: Long, unit: TimeUnit): Unit


scheduleAutoLeaderRebalanceTask simply requests the <> to <> called auto-leader-rebalance-task with the given initial delay.

The auto-leader-rebalance-task simply requests the <> to <> an <> controller event.

NOTE: scheduleAutoLeaderRebalanceTask is used when KafkaController is requested to <> and <>

=== [[processAutoPreferredReplicaLeaderElection]] processAutoPreferredReplicaLeaderElection Internal Method

[source, scala]

processAutoPreferredReplicaLeaderElection(): Unit

NOTE: processAutoPreferredReplicaLeaderElection does nothing (and simply returns) unless the Kafka broker (KafkaController) is an <>.

processAutoPreferredReplicaLeaderElection prints out the following INFO message to the logs:

Processing automatic preferred replica leader election

processAutoPreferredReplicaLeaderElection <>.

In the end, processAutoPreferredReplicaLeaderElection <> with the initial delay based on <> configuration property (default: 300 seconds).

NOTE: processAutoPreferredReplicaLeaderElection is used exclusively when KafkaController is requested to <> a <> event.

=== [[checkAndTriggerAutoLeaderRebalance]] checkAndTriggerAutoLeaderRebalance Internal Method

[source, scala]

checkAndTriggerAutoLeaderRebalance(): Unit

checkAndTriggerAutoLeaderRebalance prints out the following TRACE message to the logs:

Checking need to trigger auto leader balancing

[[checkAndTriggerAutoLeaderRebalance-preferredReplicasForTopicsByBrokers]] checkAndTriggerAutoLeaderRebalance...FIXME

checkAndTriggerAutoLeaderRebalance prints out the following DEBUG message to the logs:

Preferred replicas by broker [preferredReplicasForTopicsByBrokers]

For every broker with one or more partition leaders, checkAndTriggerAutoLeaderRebalance...FIXME

checkAndTriggerAutoLeaderRebalance prints out the following DEBUG message to the logs:

Topics not in preferred replica for broker [leaderBroker] [topicsNotInPreferredReplica]

[[checkAndTriggerAutoLeaderRebalance-imbalanceRatio]] checkAndTriggerAutoLeaderRebalance calculates an imbalance ratio of the broker which is the number of topicsNotInPreferredReplica divided by the total number of partitions (topicPartitionsForBroker).

checkAndTriggerAutoLeaderRebalance prints out the following TRACE message to the logs:

Leader imbalance ratio for broker [leaderBroker] is [imbalanceRatio]

[[checkAndTriggerAutoLeaderRebalance-candidatePartitions]] With the imbalance ratio greater than the desired ratio (per <> configuration property with the default: 10%), checkAndTriggerAutoLeaderRebalance <> for...FIXME (with <> election type).

NOTE: checkAndTriggerAutoLeaderRebalance is used exclusively when KafkaController is requested to <>.

=== [[onBrokerLogDirFailure]] Handling Log Directory Failures for Brokers -- onBrokerLogDirFailure Internal Method

[source, scala]

onBrokerLogDirFailure( brokerIds: Seq[Int]): Unit


onBrokerLogDirFailure prints out the following INFO message to the logs:

Handling log directory failure for brokers [brokerIds]

onBrokerLogDirFailure requests the <> for the <> and then requests the <> to <> for the replicas to enter OnlineReplica state.

NOTE: onBrokerLogDirFailure is used exclusively when KafkaController is requested to <>.

=== [[enableDefaultUncleanLeaderElection]] enableDefaultUncleanLeaderElection Method

[source, scala]

enableDefaultUncleanLeaderElection(): Unit

NOTE: enableDefaultUncleanLeaderElection does nothing (and simply returns) unless the Kafka broker (KafkaController) is an <>.

enableDefaultUncleanLeaderElection simply requests the <> to link:kafka-controller-ControllerEventManager.md#put[enqueue] a link:kafka-controller-ControllerEvent-UncleanLeaderElectionEnable.md[UncleanLeaderElectionEnable] event.

NOTE: enableDefaultUncleanLeaderElection is used when DynamicLogConfig is requested to link:kafka-server-DynamicLogConfig.md#reconfigure[reconfigure] (for link:kafka-log-LogConfig.md#unclean.leader.election.enable[unclean.leader.election.enable] configuration property).

=== [[electPreferredLeaders]] Preferred Replica Leader Election -- electPreferredLeaders Method

[source, scala]

electPreferredLeaders( partitions: Set[TopicPartition], callback: ElectPreferredLeadersCallback = { (_, _) => }): Unit


electPreferredLeaders simply requests the <> to <> an <> event (with <> election type)

NOTE: electPreferredLeaders is used exclusively when ReplicaManager is requested to <>.

=== [[processUncleanLeaderElectionEnable]] processUncleanLeaderElectionEnable Internal Method

[source, scala]

processUncleanLeaderElectionEnable(): Unit

NOTE: processUncleanLeaderElectionEnable does nothing (and simply returns) unless the Kafka broker (KafkaController) is an <>.

processUncleanLeaderElectionEnable prints out the following INFO message to the logs:

Unclean leader election has been enabled by default

processUncleanLeaderElectionEnable requests the <> to link:kafka-controller-PartitionStateMachine.md#triggerOnlinePartitionStateChange[triggerOnlinePartitionStateChange].

NOTE: processUncleanLeaderElectionEnable is used exclusively when KafkaController is requested to <>.

=== [[processBrokerChange]] Processing BrokerChange Controller Event (On controller-event-thread) -- processBrokerChange Internal Method

[source, scala]

processBrokerChange(): Unit

NOTE: processBrokerChange does nothing (and simply returns) unless the Kafka broker (KafkaController) is an <>.

processBrokerChange requests the <> for the <> and compares the broker list with the <> (of the <>).

At this point in time, processBrokerChange knows what brokers are new, dead or bounced.

processBrokerChange prints out the following INFO message to the logs:

[options="wrap"]

Newly added brokers: [ids], deleted brokers: [ids], bounced brokers: [ids], all live brokers: [ids]

processBrokerChange notifies (updates) the <>:

  • For every newly-added broker, processBrokerChange requests to <>

  • For bounced brokers, processBrokerChange requests to <> first followed by <>

  • For every deleted broker, processBrokerChange requests to <>

processBrokerChange updates the <>:

  • For newly-added brokers (if there were any), processBrokerChange requests to <> followed by <>

  • For bounced brokers (if there were any), processBrokerChange first requests to <> followed by <> and then requests to <> followed by <>

  • For deleted brokers (if there were any), processBrokerChange requests to <> followed by <>

In the end, only when they were any updates (new, dead or bounced brokers), processBrokerChange prints out the following INFO message to the logs:

Updated broker epochs cache: [liveBrokerIdAndEpochs]

NOTE: processBrokerChange is used exclusively when KafkaController is requested to <> (on the <>).

=== [[processLogDirEventNotification]] Processing LogDirEventNotification Controller Event (On controller-event-thread) -- processLogDirEventNotification Internal Method

[source, scala]

processLogDirEventNotification(): Unit

NOTE: processLogDirEventNotification does nothing (and simply returns) unless the Kafka broker (KafkaController) is an <>.

processLogDirEventNotification requests the <> for the <> (sequence numbers).

processLogDirEventNotification requests the <> for the <> and then <>.

In the end, processLogDirEventNotification requests the <> to <>.

NOTE: processLogDirEventNotification is used exclusively when KafkaController is requested to <>.

=== [[processStartup]] processStartup Internal Method

[source, scala]

processStartup(): Unit

processStartup requests the <> to <> (with the <>).

In the end, processStartup starts <>.

NOTE: processStartup is used exclusively when KafkaController is requested to <> (on the <>).

=== [[internal-properties]] Internal Properties

| activeControllerId a| [[activeControllerId]] The ID of the active KafkaController

  • Initialized to -1

| brokerRequestBatch a| [[brokerRequestBatch]] <> (with the <>, <>, and <>)

| controllerChangeHandler a| [[controllerChangeHandler]] A ZNodeChangeHandler (for the KafkaController and the <>) that listens to change events on /controller znode.

controllerChangeHandler <> as follows:

  • ControllerChange when the znode is created or the znode data changed

  • Reelect when the znode is deleted

| eventManager a| [[eventManager]] <> (with <> of the <>, the <> as the <>` and the <> as the <>)

eventManager is used to create other internal components to allow them for emitting controller events at state changes:

  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>

eventManager is <> when KafkaController is requested to <>.

eventManager is <> when KafkaController is requested to <>.

| kafkaScheduler | [[kafkaScheduler]] <> with 1 daemon thread with kafka-scheduler prefix

| stateChangeLogger | [[stateChangeLogger]] link:kafka-controller-StateChangeLogger.md[StateChangeLogger] with the <> and inControllerContext flag enabled

| tokenCleanScheduler | [[tokenCleanScheduler]] <> with 1 daemon thread with delegation-token-cleaner prefix

| topicDeletionManager | [[topicDeletionManager]] <> |===