KafkaApis¶
KafkaApis is responsible to handle API requests to a Kafka broker (by means of handlers).

Some requests are meant for the controller broker and simply do nothing (no-ops) when received by a regular non-controller broker.
Creating Instance¶
KafkaApis takes the following to be created:
-
RequestChannel -
MetadataSupport - ReplicaManager
- GroupCoordinator
- TransactionCoordinator
- AutoTopicCreationManager
- Broker ID
- KafkaConfig
-
ConfigRepository - MetadataCache
- Metrics
- Authorizer
-
QuotaManagers -
FetchManager -
BrokerTopicStats - Cluster ID
-
Time -
DelegationTokenManager -
ApiVersionManager
KafkaApis is created when:
BrokerServeris requested to start up (for the dataPlaneRequestProcessor and the controlPlaneRequestProcessor)KafkaServeris requested to start up (for the dataPlaneRequestProcessor and the controlPlaneRequestProcessor)
KafkaApis can be given an Authorizer when created. The Authorizer instance is given right from the creator (based on authorizer.class.name configuration property):
The Authorizer is used only to create the following:
AuthHelper¶
KafkaApis creates an AuthHelper when created.
The AuthHelper is given the optional Authorizer.
The AuthHelper is used to create the AclApis and authorize operations.
GroupCoordinator¶
KafkaApis is given a GroupCoordinator when created.
The GroupCoordinator is used for the following:
- handleAddOffsetsToTxnRequest
- handleDeleteGroupsRequest
- handleDescribeGroupRequest
- handleFindCoordinatorRequest
- handleHeartbeatRequest
- handleJoinGroupRequest
- handleLeaderAndIsrRequest
- handleLeaveGroupRequest
- handleListGroupsRequest
- handleOffsetCommitRequest
- handleOffsetDeleteRequest
- handleOffsetFetchRequest
- handleStopReplicaRequest
- handleSyncGroupRequest
- handleTxnOffsetCommitRequest
- handleUpdateMetadataRequest
- handleWriteTxnMarkersRequest
TransactionCoordinator¶
KafkaApis is given a TransactionCoordinator when created.
The TransactionCoordinator is used for the following:
- handleAddOffsetsToTxnRequest
- handleAddPartitionToTxnRequest
- handleEndTxnRequest
- handleFindCoordinatorRequest
- handleInitProducerIdRequest
- handleLeaderAndIsrRequest
- handleStopReplicaRequest
handleFetchRequest¶
handleFetchRequest(
request: RequestChannel.Request): Unit
handleFetchRequest assumes that the given RequestChannel.Request is an FetchRequest.
handleFetchRequest authorizes the request.
In the end, handleFetchRequest requests the ReplicaManager to fetchMessages.
handleFetchRequest is used when:
KafkaApisis requested to handle a FETCH request
handleFindCoordinatorRequest¶
handleFindCoordinatorRequest(
request: RequestChannel.Request): Unit
handleFindCoordinatorRequest converts the given RequestChannel.Request to an FindCoordinatorRequest.
handleFindCoordinatorRequest finds the group or transaction coordinator.
In the end, handleFindCoordinatorRequest prints out the following TRACE message to the logs:
Sending FindCoordinator response [response] for correlation id [correlationId] to client [clientId].
handleFindCoordinatorRequest is used when:
KafkaApisis requested to handle a FIND_COORDINATOR request
Finding Group or Transaction Coordinator¶
getCoordinator(
request: RequestChannel.Request,
keyType: Byte,
key: String): (Errors, Node)
For GROUP coordinator (by the keyType), getCoordinator requests the AuthHelper to authorize:
DESCRIBEoperationGROUPresource typekeyresource name
For TRANSACTION coordinator (by the keyType), getCoordinator requests the AuthHelper to authorize:
DESCRIBEoperationTRANSACTIONAL_IDresource typekeyresource name
If either fails, getCoordinator returns the Errors.
Partition¶
For GROUP coordinator (by the keyType), getCoordinator requests the GroupCoordinator for the partition of the key group.
For TRANSACTION coordinator (by the keyType), getCoordinator requests the TransactionCoordinator for the partition of the key transactional ID.
Topic Metadata¶
getCoordinator requests the MetadataCache to getTopicMetadata with the name of the internal topic and then to getAliveBrokerNode.
Possible errors are COORDINATOR_NOT_AVAILABLEs.
handleInitProducerIdRequest¶
handleInitProducerIdRequest(
request: RequestChannel.Request): Unit
handleInitProducerIdRequest assumes that the given RequestChannel.Request is an InitProducerIdRequest.
handleInitProducerIdRequest authorizes the request.
With producerId and producerEpoch set either to -1s (NO_PRODUCER_ID and NO_PRODUCER_EPOCH) or some non--1 values, handleInitProducerIdRequest requests the TransactionCoordinator to handleInitProducerId.
Otherwise, handleInitProducerIdRequest sends an error back.
handleInitProducerIdRequest is used when:
KafkaApisis requested to handle a INIT_PRODUCER_ID request
handleLeaderAndIsrRequest¶
handleLeaderAndIsrRequest(
request: RequestChannel.Request): Unit
In summary, handleLeaderAndIsrRequest requests the ReplicaManager to become the leader or a follower (of partitions).
handleLeaderAndIsrRequest expects the given RequestChannel.Request to be an LeaderAndIsrRequest.
handleLeaderAndIsrRequest requests the AuthHelper to authorize CLUSTER_ACTION operation.
In the end, handleLeaderAndIsrRequest requests the ReplicaManager to become the leader or a follower (of partitions) (with a correlationId and onLeadershipChange handler).
handleLeaderAndIsrRequest is used when:
KafkaApisis requested to handle a LEADER_AND_ISR request
handleIncrementalAlterConfigsRequest¶
handleIncrementalAlterConfigsRequest(
request: RequestChannel.Request): Unit
handleIncrementalAlterConfigsRequest...FIXME
handleIncrementalAlterConfigsRequest is used when:
KafkaApisis requested to handle a INCREMENTAL_ALTER_CONFIGS request
processIncrementalAlterConfigsRequest¶
processIncrementalAlterConfigsRequest(
originalRequest: RequestChannel.Request,
data: IncrementalAlterConfigsRequestData): IncrementalAlterConfigsResponseData
processIncrementalAlterConfigsRequest...FIXME
- FIXME
Handling API Request¶
handle(
request: RequestChannel.Request,
requestLocal: RequestLocal): Unit
handle is part of the ApiRequestHandler abstraction.
handle prints out the following TRACE message to the logs:
Handling request:[request] from connection [id];securityProtocol:[protocol],principal:[principal]
handle handles the given RequestChannel.Request (based on the apiKey in the header) using the corresponding handler.
| API Key | Handler |
|---|---|
| LeaderAndIsr | handleLeaderAndIsrRequest |
INCREMENTAL_ALTER_CONFIGS | handleIncrementalAlterConfigsRequest |
| others |
Logging¶
Enable ALL logging level for kafka.server.KafkaApis logger to see what happens inside.
Add the following line to config/log4j.properties:
log4j.logger.kafka.server.KafkaApis=ALL
Refer to Logging.
Please note that Kafka comes with a preconfigured kafka.server.KafkaApis logger in config/log4j.properties:
log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
log4j.additivity.kafka.server.KafkaApis=false
That means that the logs of KafkaApis go to logs/kafka-request.log file at TRACE logging level and are not added to the main logs (per log4j.additivity being off).