Skip to content

ControllerChannelManager

Review Me

ControllerChannelManager manages <> for link:kafka-controller-KafkaController.adoc#controllerChannelManager[KafkaController].

ControllerChannelManager is <> when <> is created (to <> or <> brokers as they "announce" themselves in Zookeeper) for <> of the <> itself, the <> and <> (so they can send broker and partition changes out to all brokers in a Kafka cluster).

When <>, ControllerChannelManager establishes connection to every broker and starts a corresponding <> to keep sending queued controller requests.

[[logIdent]] ControllerChannelManager uses [Channel manager on controller [brokerId]] as the logging prefix (aka logIdent).

[[logging]] [TIP] ==== Enable ALL logging levels for kafka.controller.ControllerChannelManager logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.controller.ControllerChannelManager=ALL

Refer to <>.

=== [[brokerStateInfo]] Connection Metadata of All Brokers -- brokerStateInfo Internal Registry

[source, scala]

brokerStateInfo: HashMap[Int, ControllerBrokerStateInfo]

brokerStateInfo is <> by broker ID.

Request threads for brokers are all <> when ControllerChannelManager is requested to <>.

A new broker is added when ControllerChannelManager is requested to <> (when <>).

A broker is removed when ControllerChannelManager is requested to <> (when <> or <>).

Use the <> gauge metric for the queue depth (i.e. how many link:kafka-controller-AbstractControlRequest.adoc[controller requests] are waiting to be sent out to all brokers).

=== [[KafkaMetricsGroup]][[metrics]] Performance Metrics

ControllerChannelManager is a <> with the following performance metrics.

.ControllerChannelManager's Performance Metrics [cols="30m,70",options="header",width="100%"] |=== | Metric Name | Description

| QueueSize a| [[QueueSize]] Controller requests (<>) queue size (per broker)

| RequestRateAndQueueTimeMs a| [[RequestRateAndQueueTimeMs]][[requestRateAndQueueTimeMetrics]] For every broker

| TotalQueueSize a| [[TotalQueueSize]] Total number of controller requests (<>) to be sent out to brokers

|===

The performance metrics are registered in kafka.controller:type=ControllerChannelManager group.

.ControllerChannelManager in jconsole image::images/ControllerChannelManager-jconsole.png[align="center"]

=== [[creating-instance]] Creating ControllerChannelManager Instance

ControllerChannelManager takes the following to be created:

  • [[controllerContext]] <>
  • [[config]] <>
  • [[time]] Time
  • [[metrics]] <>
  • [[stateChangeLogger]] link:kafka-controller-StateChangeLogger.adoc[StateChangeLogger]
  • [[threadNamePrefix]] Thread name prefix (default: (empty))

=== [[addNewBroker]] Registering New Broker -- addNewBroker Internal Method

[source, scala]

addNewBroker( broker: Broker): Unit


addNewBroker prints out the following DEBUG message to the logs:

Controller [brokerId] trying to connect to broker [id]

addNewBroker finds the name of the listener to use for communication with the broker based on link:kafka-properties.adoc#control.plane.listener.name[control.plane.listener.name] configuration property (if defined) or link:kafka-properties.adoc#inter.broker.listener.name[inter.broker.listener.name].

addNewBroker finds the security protocol to use for communication with the broker based on link:kafka-properties.adoc#control.plane.listener.name[control.plane.listener.name] and link:kafka-properties.adoc#listener.security.protocol.map[listener.security.protocol.map] configuration properties (if defined) or link:kafka-properties.adoc#security.inter.broker.protocol[security.inter.broker.protocol].

addNewBroker requests the Broker for the link:kafka-cluster-Broker.adoc#node[node] for the listener name.

addNewBroker creates a new LogContext to use the prefix:

[Controller id=[brokerId], targetBrokerId=[brokerNode]]

addNewBroker creates a link:kafka-clients-NetworkClient.adoc[NetworkClient]. Firstly, addNewBroker creates a link:kafka-common-network-ChannelBuilders.adoc#clientChannelBuilder[ChannelBuilder] (for the security protocol, the listener name, SERVER JAAS context type and SASL-related properties) and, if it is a link:kafka-common-Reconfigurable.adoc[Reconfigurable], adds it to the <> as link:kafka-server-KafkaConfig.adoc#addReconfigurable[reconfigurable]. addNewBroker then creates a link:kafka-common-network-Selector.adoc[Selector] with controller-channel metric group (and broker-id of the broker node).

addNewBroker builds a thread name per the optional <>:

[threadNamePrefix]:Controller-[brokerId]-to-broker-[id]-send-thread

addNewBroker creates a new <> timer metric with the id of the broker to connect to.

addNewBroker creates a daemon link:kafka-controller-RequestSendThread.adoc[RequestSendThread] for the broker ID (of the controller broker), the <>, the NetworkClient, the RequestRateAndQueueTimeMs metric, the <>, and the thread name.

addNewBroker creates a <> gauge metric (with the id of the broker to connect) that is the number of the link:kafka-controller-AbstractControlRequest.adoc[AbstractControlRequest] messages in the queue.

In the end, addNewBroker registers (adds) the id of the broker to connect with a new <> to the <> internal registry.

NOTE: addNewBroker is used when ControllerChannelManager is requested to <> (and connect to brokers) and <>.

=== [[addBroker]] Registering Newly-Added Broker -- addBroker Method

[source, scala]

addBroker(broker: Broker): Unit

addBroker...FIXME

NOTE: addBroker is used when KafkaController is requested to link:kafka-controller-KafkaController.adoc#processBrokerChange[process a BrokerChange controller event].

=== [[removeBroker]] Deregistering Broker -- removeBroker Method

[source, scala]

removeBroker(brokerId: Int): Unit

removeBroker finds the broker metadata in the <> internal registry that is then used to <>.

NOTE: removeBroker is used exclusively when KafkaController is requested to <>.

=== [[startup]] Starting Up -- startup Method

[source, scala]

startup(): Unit

startup...FIXME

NOTE: startup is used when KafkaController is requested to link:kafka-controller-KafkaController.adoc#initializeControllerContext[initializeControllerContext].

=== [[shutdown]] Shutting Down -- shutdown Method

[source, scala]

shutdown(): Unit

shutdown...FIXME

NOTE: shutdown is used when...FIXME

=== [[sendRequest]] Sending AbstractControlRequest Out to Broker -- sendRequest Method

[source, scala]

sendRequest( brokerId: Int, request: AbstractControlRequest.Builder[_ <: AbstractControlRequest], callback: AbstractResponse => Unit = null)


sendRequest...FIXME

NOTE: sendRequest is used exclusively when ControllerBrokerRequestBatch is requested to link:kafka-controller-ControllerBrokerRequestBatch.adoc#sendRequest[send a controller request to a broker].

=== [[removeExistingBroker]] removeExistingBroker Internal Method

[source, scala]

removeExistingBroker( brokerState: ControllerBrokerStateInfo): Unit


removeExistingBroker...FIXME

NOTE: removeExistingBroker is used when...FIXME

=== [[startRequestSendThread]] Starting RequestSendThread -- startRequestSendThread Internal Method

[source, scala]

startRequestSendThread( brokerId: Int): Unit


startRequestSendThread finds the RequestSendThread in the broker metadata in the <> internal registry and, if the thread has not started yet, startRequestSendThread <>.

NOTE: startRequestSendThread is used when ControllerChannelManager is requested to <> and <>.

=== [[ControllerBrokerStateInfo]] ControllerBrokerStateInfo

ControllerBrokerStateInfo is a broker metadata that holds the following:

  • [[networkClient]] <>
  • [[brokerNode]] Broker Node
  • [[messageQueue]] Message Queue (BlockingQueue[QueueItem])
  • [[requestSendThread]] RequestSendThread
  • [[queueSizeGauge]] Queue Size (Gauge[Int])
  • [[requestRateAndTimeMetrics]] RequestRateAndTime Metrics
  • [[reconfigurableChannelBuilder]] <>