ControllerChannelManager¶
Review Me¶
ControllerChannelManager
manages <
ControllerChannelManager
is <
When <ControllerChannelManager
establishes connection to every broker and starts a corresponding <
[[logIdent]] ControllerChannelManager
uses [Channel manager on controller [brokerId]] as the logging prefix (aka logIdent
).
[[logging]] [TIP] ==== Enable ALL
logging levels for kafka.controller.ControllerChannelManager
logger to see what happens inside.
Add the following line to config/log4j.properties
:
log4j.logger.kafka.controller.ControllerChannelManager=ALL
Refer to <>.¶
=== [[brokerStateInfo]] Connection Metadata of All Brokers -- brokerStateInfo
Internal Registry
[source, scala]¶
brokerStateInfo: HashMap[Int, ControllerBrokerStateInfo]¶
brokerStateInfo
is <
Request threads for brokers are all <ControllerChannelManager
is requested to <
A new broker is added when ControllerChannelManager
is requested to <
A broker is removed when ControllerChannelManager
is requested to <
Use the <
=== [[KafkaMetricsGroup]][[metrics]] Performance Metrics
ControllerChannelManager
is a <
.ControllerChannelManager's Performance Metrics [cols="30m,70",options="header",width="100%"] |=== | Metric Name | Description
| QueueSize a| [[QueueSize]] Controller requests (<
| RequestRateAndQueueTimeMs a| [[RequestRateAndQueueTimeMs]][[requestRateAndQueueTimeMetrics]] For every broker
| TotalQueueSize a| [[TotalQueueSize]] Total number of controller requests (<
|===
The performance metrics are registered in kafka.controller:type=ControllerChannelManager group.
.ControllerChannelManager in jconsole image::images/ControllerChannelManager-jconsole.png[align="center"]
=== [[creating-instance]] Creating ControllerChannelManager Instance
ControllerChannelManager
takes the following to be created:
- [[controllerContext]] <
> - [[config]] <
> - [[time]]
Time
- [[metrics]] <
> - [[stateChangeLogger]] link:kafka-controller-StateChangeLogger.adoc[StateChangeLogger]
- [[threadNamePrefix]] Thread name prefix (default:
(empty)
)
=== [[addNewBroker]] Registering New Broker -- addNewBroker
Internal Method
[source, scala]¶
addNewBroker( broker: Broker): Unit
addNewBroker
prints out the following DEBUG message to the logs:
Controller [brokerId] trying to connect to broker [id]
addNewBroker
finds the name of the listener to use for communication with the broker based on link:kafka-properties.adoc#control.plane.listener.name[control.plane.listener.name] configuration property (if defined) or link:kafka-properties.adoc#inter.broker.listener.name[inter.broker.listener.name].
addNewBroker
finds the security protocol to use for communication with the broker based on link:kafka-properties.adoc#control.plane.listener.name[control.plane.listener.name] and link:kafka-properties.adoc#listener.security.protocol.map[listener.security.protocol.map] configuration properties (if defined) or link:kafka-properties.adoc#security.inter.broker.protocol[security.inter.broker.protocol].
addNewBroker
requests the Broker
for the link:kafka-cluster-Broker.adoc#node[node] for the listener name.
addNewBroker
creates a new LogContext
to use the prefix:
[Controller id=[brokerId], targetBrokerId=[brokerNode]]
addNewBroker
creates a link:kafka-clients-NetworkClient.adoc[NetworkClient]. Firstly, addNewBroker
creates a link:kafka-common-network-ChannelBuilders.adoc#clientChannelBuilder[ChannelBuilder] (for the security protocol, the listener name, SERVER
JAAS context type and SASL-related properties) and, if it is a link:kafka-common-Reconfigurable.adoc[Reconfigurable], adds it to the <addNewBroker
then creates a link:kafka-common-network-Selector.adoc[Selector] with controller-channel
metric group (and broker-id
of the broker node).
addNewBroker
builds a thread name per the optional <
[threadNamePrefix]:Controller-[brokerId]-to-broker-[id]-send-thread
addNewBroker
creates a new <
addNewBroker
creates a daemon link:kafka-controller-RequestSendThread.adoc[RequestSendThread] for the broker ID (of the controller broker), the <NetworkClient
, the RequestRateAndQueueTimeMs
metric, the <
addNewBroker
creates a <
In the end, addNewBroker
registers (adds) the id of the broker to connect with a new <
NOTE: addNewBroker
is used when ControllerChannelManager
is requested to <
=== [[addBroker]] Registering Newly-Added Broker -- addBroker
Method
[source, scala]¶
addBroker(broker: Broker): Unit¶
addBroker
...FIXME
NOTE: addBroker
is used when KafkaController
is requested to link:kafka-controller-KafkaController.adoc#processBrokerChange[process a BrokerChange controller event].
=== [[removeBroker]] Deregistering Broker -- removeBroker
Method
[source, scala]¶
removeBroker(brokerId: Int): Unit¶
removeBroker
finds the broker metadata in the <
NOTE: removeBroker
is used exclusively when KafkaController
is requested to <
=== [[startup]] Starting Up -- startup
Method
[source, scala]¶
startup(): Unit¶
startup
...FIXME
NOTE: startup
is used when KafkaController
is requested to link:kafka-controller-KafkaController.adoc#initializeControllerContext[initializeControllerContext].
=== [[shutdown]] Shutting Down -- shutdown
Method
[source, scala]¶
shutdown(): Unit¶
shutdown
...FIXME
NOTE: shutdown
is used when...FIXME
=== [[sendRequest]] Sending AbstractControlRequest Out to Broker -- sendRequest
Method
[source, scala]¶
sendRequest( brokerId: Int, request: AbstractControlRequest.Builder[_ <: AbstractControlRequest], callback: AbstractResponse => Unit = null)
sendRequest
...FIXME
NOTE: sendRequest
is used exclusively when ControllerBrokerRequestBatch
is requested to link:kafka-controller-ControllerBrokerRequestBatch.adoc#sendRequest[send a controller request to a broker].
=== [[removeExistingBroker]] removeExistingBroker
Internal Method
[source, scala]¶
removeExistingBroker( brokerState: ControllerBrokerStateInfo): Unit
removeExistingBroker
...FIXME
NOTE: removeExistingBroker
is used when...FIXME
=== [[startRequestSendThread]] Starting RequestSendThread -- startRequestSendThread
Internal Method
[source, scala]¶
startRequestSendThread( brokerId: Int): Unit
startRequestSendThread
finds the RequestSendThread
in the broker metadata in the <startRequestSendThread
<
NOTE: startRequestSendThread
is used when ControllerChannelManager
is requested to <
=== [[ControllerBrokerStateInfo]] ControllerBrokerStateInfo
ControllerBrokerStateInfo
is a broker metadata that holds the following:
- [[networkClient]] <
> - [[brokerNode]] Broker Node
- [[messageQueue]] Message Queue (
BlockingQueue[QueueItem]
) - [[requestSendThread]]
RequestSendThread
- [[queueSizeGauge]] Queue Size (
Gauge[Int]
) - [[requestRateAndTimeMetrics]] RequestRateAndTime Metrics
- [[reconfigurableChannelBuilder]] <
>