KafkaServer¶
KafkaServer
is a Server for Zookeeper mode (non-KRaft mode).
Creating Instance¶
KafkaServer
takes the following to be created:
- KafkaConfig
-
Time
(default:SYSTEM
) - Optional Thread Name Prefix (default: undefined)
- enableForwarding flag
KafkaServer
is created when:
Kafka
command-line application is launched (and builds a server with process.roles specified)
enableForwarding¶
enableForwarding: Boolean
KafkaServer
can be given enableForwarding
flag when created.
enableForwarding
is false
unless specified explicitly that seems never happen.
When enabled, KafkaServer
creates a ForwardingManager and uses BrokerToControllerChannelManager.
ZkConfigManager¶
KafkaServer
creates a ZkConfigManager when started with the following:
KafkaServer
requests the ZkConfigManager
to startup immediately.
ConfigHandlers¶
dynamicConfigHandlers: Map[String, ConfigHandler]
KafkaServer
uses dynamicConfigHandlers
registry of ConfigHandlers (by their name).
Name | ConfigHandler |
---|---|
topics | TopicConfigHandler |
clients | ClientIdConfigHandler |
users | UserConfigHandler |
brokers | BrokerConfigHandler |
ips | IpConfigHandler |
KafkaServer
uses the dynamicConfigHandlers
to create ZkConfigManager (at startup).
TransactionCoordinator¶
KafkaServer
creates and starts a TransactionCoordinator when created.
KafkaServer
uses the TransactionCoordinator
to create the following:
- data-plane and the control-plane request processors
- AutoTopicCreationManager
The TransactionCoordinator
is requested to shutdown along with KafkaServer.
Data-Plane Request Processor¶
KafkaServer
creates a KafkaApis for data-related communication.
KafkaApis
is used to create data-plane request handler pool.
KafkaRequestHandlerPool¶
Control-Plane Request Processor¶
KafkaServer
creates a KafkaApis for control-related communication.
Starting Up¶
startup(): Unit
startup
is part of the Server abstraction.
startup
prints out the following INFO message to the logs:
starting
startup
initZkClient and creates a ZkConfigRepository (with a new AdminZkClient
).
startup
...FIXME
startup
getOrGenerateClusterId (that becomes the _clusterId) and prints out the following INFO message to the logs:
Cluster ID = [clusterId]
startup
getBrokerMetadataAndOfflineDirs with the logDirs.
startup
looks up the broker ID.
startup
...FIXME
startup
creates a LogManager that is requested to start up.
startup
...FIXME
startup
creates a TransactionCoordinator (with the ReplicaManager) and requests it to startup.
startup
...FIXME
KafkaBroker¶
KafkaServer
is a KafkaBroker.
Looking Up Broker ID¶
getOrGenerateBrokerId(
brokerMetadata: RawMetaProperties): Int
getOrGenerateBrokerId
takes the broker.id (from the KafkaConfig) and makes sure that it matches the RawMetaProperties
's (or an InconsistentBrokerIdException
is thrown).
getOrGenerateBrokerId
uses the given RawMetaProperties
for the broker ID if defined.
Otherwise, if broker.id
(from the KafkaConfig) is negative and broker.id.generation.enable is enabled, getOrGenerateBrokerId
generates a broker ID.
In the end, when all the earlier attempts "fail", getOrGenerateBrokerId
uses the broker.id (from the KafkaConfig).
getOrGenerateBrokerId
is used when:
KafkaServer
is requested to start up
LogManager¶
logManager: LogManager
logManager
is part of the KafkaBroker abstraction.
KafkaServer
creates a LogManager at startup.
authorizer: Option[Authorizer]
authorizer
is part of the KafkaBroker abstraction.
KafkaServer
is given an Authorizer at startup based on authorizer.class.name configuration property.
KafkaController¶
KafkaServer
creates a KafkaController at startup.
The KafkaController
is requested to start up immediately and shut down alongside the KafkaServer.
The KafkaController
is used when:
- Creating a
TopicConfigHandler
(in the dynamicConfigHandlers) - controlledShutdown (for the brokerEpoch)
DynamicLogConfig
is requested toreconfigure
DynamicListenerConfig
is requested toreconfigure
KafkaServer
is requested to startup- for the brokerEpoch for AlterIsrManager
- for the brokerEpoch for ProducerIdManager
ZkSupport
Logging¶
Enable ALL
logging level for kafka.server.KafkaServer
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.kafka.server.KafkaServer=ALL
Refer to Logging.