KafkaApis¶
KafkaApis
is responsible to handle API requests to a Kafka broker (by means of handlers).
Some requests are meant for the controller broker and simply do nothing (no-ops) when received by a regular non-controller broker.
Creating Instance¶
KafkaApis
takes the following to be created:
-
RequestChannel
-
MetadataSupport
- ReplicaManager
- GroupCoordinator
- TransactionCoordinator
- AutoTopicCreationManager
- Broker ID
- KafkaConfig
-
ConfigRepository
- MetadataCache
- Metrics
- Authorizer
-
QuotaManagers
-
FetchManager
-
BrokerTopicStats
- Cluster ID
-
Time
-
DelegationTokenManager
-
ApiVersionManager
KafkaApis
is created when:
BrokerServer
is requested to start up (for the dataPlaneRequestProcessor and the controlPlaneRequestProcessor)KafkaServer
is requested to start up (for the dataPlaneRequestProcessor and the controlPlaneRequestProcessor)
KafkaApis
can be given an Authorizer when created. The Authorizer
instance is given right from the creator (based on authorizer.class.name configuration property):
The Authorizer
is used only to create the following:
AuthHelper¶
KafkaApis
creates an AuthHelper when created.
The AuthHelper
is given the optional Authorizer.
The AuthHelper
is used to create the AclApis and authorize operations.
GroupCoordinator¶
KafkaApis
is given a GroupCoordinator when created.
The GroupCoordinator
is used for the following:
- handleAddOffsetsToTxnRequest
- handleDeleteGroupsRequest
- handleDescribeGroupRequest
- handleFindCoordinatorRequest
- handleHeartbeatRequest
- handleJoinGroupRequest
- handleLeaderAndIsrRequest
- handleLeaveGroupRequest
- handleListGroupsRequest
- handleOffsetCommitRequest
- handleOffsetDeleteRequest
- handleOffsetFetchRequest
- handleStopReplicaRequest
- handleSyncGroupRequest
- handleTxnOffsetCommitRequest
- handleUpdateMetadataRequest
- handleWriteTxnMarkersRequest
TransactionCoordinator¶
KafkaApis
is given a TransactionCoordinator when created.
The TransactionCoordinator
is used for the following:
- handleAddOffsetsToTxnRequest
- handleAddPartitionToTxnRequest
- handleEndTxnRequest
- handleFindCoordinatorRequest
- handleInitProducerIdRequest
- handleLeaderAndIsrRequest
- handleStopReplicaRequest
handleFetchRequest¶
handleFetchRequest(
request: RequestChannel.Request): Unit
handleFetchRequest
assumes that the given RequestChannel.Request
is an FetchRequest
.
handleFetchRequest
authorizes the request.
In the end, handleFetchRequest
requests the ReplicaManager to fetchMessages.
handleFetchRequest
is used when:
KafkaApis
is requested to handle a FETCH request
handleFindCoordinatorRequest¶
handleFindCoordinatorRequest(
request: RequestChannel.Request): Unit
handleFindCoordinatorRequest
converts the given RequestChannel.Request
to an FindCoordinatorRequest
.
handleFindCoordinatorRequest
finds the group or transaction coordinator.
In the end, handleFindCoordinatorRequest
prints out the following TRACE message to the logs:
Sending FindCoordinator response [response] for correlation id [correlationId] to client [clientId].
handleFindCoordinatorRequest
is used when:
KafkaApis
is requested to handle a FIND_COORDINATOR request
Finding Group or Transaction Coordinator¶
getCoordinator(
request: RequestChannel.Request,
keyType: Byte,
key: String): (Errors, Node)
For GROUP
coordinator (by the keyType
), getCoordinator
requests the AuthHelper to authorize:
DESCRIBE
operationGROUP
resource typekey
resource name
For TRANSACTION
coordinator (by the keyType
), getCoordinator
requests the AuthHelper to authorize:
DESCRIBE
operationTRANSACTIONAL_ID
resource typekey
resource name
If either fails, getCoordinator
returns the Errors
.
Partition¶
For GROUP
coordinator (by the keyType
), getCoordinator
requests the GroupCoordinator for the partition of the key
group.
For TRANSACTION
coordinator (by the keyType
), getCoordinator
requests the TransactionCoordinator for the partition of the key
transactional ID.
Topic Metadata¶
getCoordinator
requests the MetadataCache to getTopicMetadata with the name of the internal topic and then to getAliveBrokerNode.
Possible errors are COORDINATOR_NOT_AVAILABLE
s.
handleInitProducerIdRequest¶
handleInitProducerIdRequest(
request: RequestChannel.Request): Unit
handleInitProducerIdRequest
assumes that the given RequestChannel.Request
is an InitProducerIdRequest
.
handleInitProducerIdRequest
authorizes the request.
With producerId
and producerEpoch
set either to -1
s (NO_PRODUCER_ID
and NO_PRODUCER_EPOCH
) or some non--1
values, handleInitProducerIdRequest
requests the TransactionCoordinator to handleInitProducerId.
Otherwise, handleInitProducerIdRequest
sends an error back.
handleInitProducerIdRequest
is used when:
KafkaApis
is requested to handle a INIT_PRODUCER_ID request
handleLeaderAndIsrRequest¶
handleLeaderAndIsrRequest(
request: RequestChannel.Request): Unit
In summary, handleLeaderAndIsrRequest
requests the ReplicaManager to become the leader or a follower (of partitions).
handleLeaderAndIsrRequest
expects the given RequestChannel.Request
to be an LeaderAndIsrRequest.
handleLeaderAndIsrRequest
requests the AuthHelper to authorize CLUSTER_ACTION
operation.
In the end, handleLeaderAndIsrRequest
requests the ReplicaManager to become the leader or a follower (of partitions) (with a correlationId
and onLeadershipChange handler).
handleLeaderAndIsrRequest
is used when:
KafkaApis
is requested to handle a LEADER_AND_ISR request
handleIncrementalAlterConfigsRequest¶
handleIncrementalAlterConfigsRequest(
request: RequestChannel.Request): Unit
handleIncrementalAlterConfigsRequest
...FIXME
handleIncrementalAlterConfigsRequest
is used when:
KafkaApis
is requested to handle a INCREMENTAL_ALTER_CONFIGS request
processIncrementalAlterConfigsRequest¶
processIncrementalAlterConfigsRequest(
originalRequest: RequestChannel.Request,
data: IncrementalAlterConfigsRequestData): IncrementalAlterConfigsResponseData
processIncrementalAlterConfigsRequest
...FIXME
- FIXME
Handling API Request¶
handle(
request: RequestChannel.Request,
requestLocal: RequestLocal): Unit
handle
is part of the ApiRequestHandler abstraction.
handle
prints out the following TRACE message to the logs:
Handling request:[request] from connection [id];securityProtocol:[protocol],principal:[principal]
handle
handles the given RequestChannel.Request
(based on the apiKey
in the header) using the corresponding handler.
API Key | Handler |
---|---|
LeaderAndIsr | handleLeaderAndIsrRequest |
INCREMENTAL_ALTER_CONFIGS | handleIncrementalAlterConfigsRequest |
others |
Logging¶
Enable ALL
logging level for kafka.server.KafkaApis
logger to see what happens inside.
Add the following line to config/log4j.properties
:
log4j.logger.kafka.server.KafkaApis=ALL
Refer to Logging.
Please note that Kafka comes with a preconfigured kafka.server.KafkaApis
logger in config/log4j.properties
:
log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
log4j.additivity.kafka.server.KafkaApis=false
That means that the logs of KafkaApis
go to logs/kafka-request.log
file at TRACE
logging level and are not added to the main logs (per log4j.additivity
being off).