Skip to content

KafkaServer

KafkaServer is a Server for Zookeeper mode (non-KRaft mode).

Creating Instance

KafkaServer takes the following to be created:

KafkaServer is created when:

enableForwarding

enableForwarding: Boolean

KafkaServer can be given enableForwarding flag when created.

enableForwarding is false unless specified explicitly that seems never happen.

When enabled, KafkaServer creates a ForwardingManager and uses BrokerToControllerChannelManager.

ZkConfigManager

KafkaServer creates a ZkConfigManager when started with the following:

KafkaServer requests the ZkConfigManager to startup immediately.

ConfigHandlers

dynamicConfigHandlers: Map[String, ConfigHandler]

KafkaServer uses dynamicConfigHandlers registry of ConfigHandlers (by their name).

Name ConfigHandler
topics TopicConfigHandler
clients ClientIdConfigHandler
users UserConfigHandler
brokers BrokerConfigHandler
ips IpConfigHandler

KafkaServer uses the dynamicConfigHandlers to create ZkConfigManager (at startup).

TransactionCoordinator

KafkaServer creates and starts a TransactionCoordinator when created.

KafkaServer uses the TransactionCoordinator to create the following:

The TransactionCoordinator is requested to shutdown along with KafkaServer.

Data-Plane Request Processor

KafkaServer creates a KafkaApis for data-related communication.

KafkaApis is used to create data-plane request handler pool.

KafkaRequestHandlerPool

Control-Plane Request Processor

KafkaServer creates a KafkaApis for control-related communication.

Starting Up

startup(): Unit

startup is part of the Server abstraction.


startup prints out the following INFO message to the logs:

starting

startup initZkClient and creates a ZkConfigRepository (with a new AdminZkClient).

startup...FIXME

startup getOrGenerateClusterId (that becomes the _clusterId) and prints out the following INFO message to the logs:

Cluster ID = [clusterId]

startup getBrokerMetadataAndOfflineDirs with the logDirs.

startup looks up the broker ID.

startup...FIXME

startup creates a LogManager that is requested to start up.

startup...FIXME

startup creates a TransactionCoordinator (with the ReplicaManager) and requests it to startup.

startup...FIXME

KafkaBroker

KafkaServer is a KafkaBroker.

Looking Up Broker ID

getOrGenerateBrokerId(
  brokerMetadata: RawMetaProperties): Int

getOrGenerateBrokerId takes the broker.id (from the KafkaConfig) and makes sure that it matches the RawMetaProperties's (or an InconsistentBrokerIdException is thrown).

getOrGenerateBrokerId uses the given RawMetaProperties for the broker ID if defined.

Otherwise, if broker.id (from the KafkaConfig) is negative and broker.id.generation.enable is enabled, getOrGenerateBrokerId generates a broker ID.

In the end, when all the earlier attempts "fail", getOrGenerateBrokerId uses the broker.id (from the KafkaConfig).


getOrGenerateBrokerId is used when:

LogManager

logManager: LogManager

logManager is part of the KafkaBroker abstraction.


KafkaServer creates a LogManager at startup.

Authorizer

authorizer: Option[Authorizer]

authorizer is part of the KafkaBroker abstraction.


KafkaServer is given an Authorizer at startup based on authorizer.class.name configuration property.

KafkaController

KafkaServer creates a KafkaController at startup.

The KafkaController is requested to start up immediately and shut down alongside the KafkaServer.

The KafkaController is used when:

Logging

Enable ALL logging level for kafka.server.KafkaServer logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.kafka.server.KafkaServer=ALL

Refer to Logging.