KafkaServer¶
KafkaServer is a Server for Zookeeper mode (non-KRaft mode).
Creating Instance¶
KafkaServer takes the following to be created:
- KafkaConfig
-
Time(default:SYSTEM) - Optional Thread Name Prefix (default: undefined)
- enableForwarding flag
KafkaServer is created when:
Kafkacommand-line application is launched (and builds a server with process.roles specified)
enableForwarding¶
enableForwarding: Boolean
KafkaServer can be given enableForwarding flag when created.
enableForwarding is false unless specified explicitly that seems never happen.
When enabled, KafkaServer creates a ForwardingManager and uses BrokerToControllerChannelManager.
ZkConfigManager¶
KafkaServer creates a ZkConfigManager when started with the following:
KafkaServer requests the ZkConfigManager to startup immediately.
ConfigHandlers¶
dynamicConfigHandlers: Map[String, ConfigHandler]
KafkaServer uses dynamicConfigHandlers registry of ConfigHandlers (by their name).
| Name | ConfigHandler |
|---|---|
| topics | TopicConfigHandler |
| clients | ClientIdConfigHandler |
| users | UserConfigHandler |
| brokers | BrokerConfigHandler |
| ips | IpConfigHandler |
KafkaServer uses the dynamicConfigHandlers to create ZkConfigManager (at startup).
TransactionCoordinator¶
KafkaServer creates and starts a TransactionCoordinator when created.
KafkaServer uses the TransactionCoordinator to create the following:
- data-plane and the control-plane request processors
- AutoTopicCreationManager
The TransactionCoordinator is requested to shutdown along with KafkaServer.
Data-Plane Request Processor¶
KafkaServer creates a KafkaApis for data-related communication.
KafkaApis is used to create data-plane request handler pool.
KafkaRequestHandlerPool¶
Control-Plane Request Processor¶
KafkaServer creates a KafkaApis for control-related communication.
Starting Up¶
startup(): Unit
startup is part of the Server abstraction.
startup prints out the following INFO message to the logs:
starting
startup initZkClient and creates a ZkConfigRepository (with a new AdminZkClient).
startup...FIXME
startup getOrGenerateClusterId (that becomes the _clusterId) and prints out the following INFO message to the logs:
Cluster ID = [clusterId]
startup getBrokerMetadataAndOfflineDirs with the logDirs.
startup looks up the broker ID.
startup...FIXME
startup creates a LogManager that is requested to start up.
startup...FIXME
startup creates a TransactionCoordinator (with the ReplicaManager) and requests it to startup.
startup...FIXME
KafkaBroker¶
KafkaServer is a KafkaBroker.
Looking Up Broker ID¶
getOrGenerateBrokerId(
brokerMetadata: RawMetaProperties): Int
getOrGenerateBrokerId takes the broker.id (from the KafkaConfig) and makes sure that it matches the RawMetaProperties's (or an InconsistentBrokerIdException is thrown).
getOrGenerateBrokerId uses the given RawMetaProperties for the broker ID if defined.
Otherwise, if broker.id (from the KafkaConfig) is negative and broker.id.generation.enable is enabled, getOrGenerateBrokerId generates a broker ID.
In the end, when all the earlier attempts "fail", getOrGenerateBrokerId uses the broker.id (from the KafkaConfig).
getOrGenerateBrokerId is used when:
KafkaServeris requested to start up
LogManager¶
logManager: LogManager
logManager is part of the KafkaBroker abstraction.
KafkaServer creates a LogManager at startup.
authorizer: Option[Authorizer]
authorizer is part of the KafkaBroker abstraction.
KafkaServer is given an Authorizer at startup based on authorizer.class.name configuration property.
KafkaController¶
KafkaServer creates a KafkaController at startup.
The KafkaController is requested to start up immediately and shut down alongside the KafkaServer.
The KafkaController is used when:
- Creating a
TopicConfigHandler(in the dynamicConfigHandlers) - controlledShutdown (for the brokerEpoch)
DynamicLogConfigis requested toreconfigureDynamicListenerConfigis requested toreconfigureKafkaServeris requested to startup- for the brokerEpoch for AlterIsrManager
- for the brokerEpoch for ProducerIdManager
ZkSupport
Logging¶
Enable ALL logging level for kafka.server.KafkaServer logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.kafka.server.KafkaServer=ALL
Refer to Logging.