Skip to content

KafkaApis

KafkaApis is responsible to handle API requests to a Kafka broker (by means of handlers).

KafkaApis is Created for KafkaRequestHandlerPool when KafkaServer Starts Up

Some requests are meant for the controller broker and simply do nothing (no-ops) when received by a regular non-controller broker.

Creating Instance

KafkaApis takes the following to be created:

KafkaApis is created when:

Authorizer

KafkaApis can be given an Authorizer when created. The Authorizer instance is given right from the creator (based on authorizer.class.name configuration property):

The Authorizer is used only to create the following:

AuthHelper

KafkaApis creates an AuthHelper when created.

The AuthHelper is given the optional Authorizer.

The AuthHelper is used to create the AclApis and authorize operations.

GroupCoordinator

KafkaApis is given a GroupCoordinator when created.

The GroupCoordinator is used for the following:

TransactionCoordinator

KafkaApis is given a TransactionCoordinator when created.

The TransactionCoordinator is used for the following:

handleFetchRequest

handleFetchRequest(
  request: RequestChannel.Request): Unit

handleFetchRequest assumes that the given RequestChannel.Request is an FetchRequest.

handleFetchRequest authorizes the request.

In the end, handleFetchRequest requests the ReplicaManager to fetchMessages.

handleFetchRequest is used when:

handleFindCoordinatorRequest

handleFindCoordinatorRequest(
  request: RequestChannel.Request): Unit

handleFindCoordinatorRequest converts the given RequestChannel.Request to an FindCoordinatorRequest.

handleFindCoordinatorRequest finds the group or transaction coordinator.

In the end, handleFindCoordinatorRequest prints out the following TRACE message to the logs:

Sending FindCoordinator response [response] for correlation id [correlationId] to client [clientId].

handleFindCoordinatorRequest is used when:

Finding Group or Transaction Coordinator

getCoordinator(
  request: RequestChannel.Request,
  keyType: Byte,
  key: String): (Errors, Node)

Authorization

For GROUP coordinator (by the keyType), getCoordinator requests the AuthHelper to authorize:

  • DESCRIBE operation
  • GROUP resource type
  • key resource name

For TRANSACTION coordinator (by the keyType), getCoordinator requests the AuthHelper to authorize:

  • DESCRIBE operation
  • TRANSACTIONAL_ID resource type
  • key resource name

If either fails, getCoordinator returns the Errors.

Partition

For GROUP coordinator (by the keyType), getCoordinator requests the GroupCoordinator for the partition of the key group.

For TRANSACTION coordinator (by the keyType), getCoordinator requests the TransactionCoordinator for the partition of the key transactional ID.

Topic Metadata

getCoordinator requests the MetadataCache to getTopicMetadata with the name of the internal topic and then to getAliveBrokerNode.

Possible errors are COORDINATOR_NOT_AVAILABLEs.

handleInitProducerIdRequest

handleInitProducerIdRequest(
  request: RequestChannel.Request): Unit

handleInitProducerIdRequest assumes that the given RequestChannel.Request is an InitProducerIdRequest.

handleInitProducerIdRequest authorizes the request.

With producerId and producerEpoch set either to -1s (NO_PRODUCER_ID and NO_PRODUCER_EPOCH) or some non--1 values, handleInitProducerIdRequest requests the TransactionCoordinator to handleInitProducerId.

Otherwise, handleInitProducerIdRequest sends an error back.

handleInitProducerIdRequest is used when:

handleLeaderAndIsrRequest

handleLeaderAndIsrRequest(
  request: RequestChannel.Request): Unit

In summary, handleLeaderAndIsrRequest requests the ReplicaManager to become the leader or a follower (of partitions).


handleLeaderAndIsrRequest expects the given RequestChannel.Request to be an LeaderAndIsrRequest.

handleLeaderAndIsrRequest requests the AuthHelper to authorize CLUSTER_ACTION operation.

In the end, handleLeaderAndIsrRequest requests the ReplicaManager to become the leader or a follower (of partitions) (with a correlationId and onLeadershipChange handler).


handleLeaderAndIsrRequest is used when:

handleIncrementalAlterConfigsRequest

handleIncrementalAlterConfigsRequest(
  request: RequestChannel.Request): Unit

handleIncrementalAlterConfigsRequest...FIXME


handleIncrementalAlterConfigsRequest is used when:

processIncrementalAlterConfigsRequest

processIncrementalAlterConfigsRequest(
  originalRequest: RequestChannel.Request,
  data: IncrementalAlterConfigsRequestData): IncrementalAlterConfigsResponseData

processIncrementalAlterConfigsRequest...FIXME

  • FIXME

Handling API Request

handle(
  request: RequestChannel.Request,
  requestLocal: RequestLocal): Unit

handle is part of the ApiRequestHandler abstraction.


handle prints out the following TRACE message to the logs:

Handling request:[request] from connection [id];securityProtocol:[protocol],principal:[principal]

handle handles the given RequestChannel.Request (based on the apiKey in the header) using the corresponding handler.

API Key Handler
LeaderAndIsr handleLeaderAndIsrRequest
INCREMENTAL_ALTER_CONFIGS handleIncrementalAlterConfigsRequest
others

Logging

Enable ALL logging level for kafka.server.KafkaApis logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.server.KafkaApis=ALL

Refer to Logging.


Please note that Kafka comes with a preconfigured kafka.server.KafkaApis logger in config/log4j.properties:

log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
log4j.additivity.kafka.server.KafkaApis=false

That means that the logs of KafkaApis go to logs/kafka-request.log file at TRACE logging level and are not added to the main logs (per log4j.additivity being off).