Skip to content

Sender

Sender is a Runnable (Java) that is executed as a separate thread alongside KafkaProducer to send records to a Kafka cluster.

Creating Instance

Sender takes the following to be created:

Sender is created along with KafkaProducer.

KafkaClient

Sender is given a KafkaClient when created.

Running Thread

void run()

run prints out the following DEBUG message to the logs:

Starting Kafka producer I/O thread.

run runs once and repeats until the running flag is turned off.

Right after the running flag is off, run prints out the following DEBUG message to the logs:

Beginning shutdown of Kafka producer I/O thread, sending remaining records.

run...FIXME

In the end, run prints out the following DEBUG message to the logs:

Shutdown of Kafka producer I/O thread has completed.

run is part of the Runnable (Java) abstraction.

runOnce

void runOnce()

If executed with a TransactionManager, runOnce...FIXME

runOnce sendProducerData.

runOnce requests the KafkaClient to poll.

sendProducerData

long sendProducerData(
  long now)

sendProducerData requests the ProducerMetadata for the current cluster info

sendProducerData requests the RecordAccumulator for the partitions with data ready to send.

sendProducerData requests a metadata update when there are partitions with no leaders.

sendProducerData removes nodes not ready to send to.

sendProducerData requests the RecordAccumulator to drain (and create ProducerBatchs).

sendProducerData registers the batches (in the inFlightBatches registry).

With guaranteeMessageOrder, sendProducerData mutes all the partitions drained.

sendProducerData requests the RecordAccumulator to resetNextBatchExpiryTime.

sendProducerData requests the RecordAccumulator for the expired batches and adds all expired InflightBatches.

If there are any expired batches, sendProducerData...FIXME

sendProducerData requests the SenderMetrics to updateProduceRequestMetrics.

With at least one broker to send batches to, sendProducerData prints out the following TRACE message to the logs:

Nodes with data ready to send: [readyNodes]

sendProducerData sendProduceRequests.

sendProduceRequests

void sendProduceRequests(
  Map<Integer, List<ProducerBatch>> collated,
  long now)

For every pair of a broker node and an associated ProducerBatch (in the given collated collection), sendProduceRequests sendProduceRequest with the broker node, the acks, the requestTimeoutMs and the ProducerBatch.

sendProduceRequest

void sendProduceRequest(
  long now,
  int destination,
  short acks,
  int timeout,
  List<ProducerBatch> batches)

sendProduceRequest creates a collection of ProducerBatches by TopicPartition from the given batches.

sendProduceRequest requests the KafkaClient for a new ClientRequest (for the destination broker) and to send it.

sendProduceRequest registers a [handleProduceResponse] callback to invoke when a response arrives. sendProduceRequest expects a response for all the acks but 0.

In the end, sendProduceRequest prints out the following TRACE message to the logs:

Sent produce request to [nodeId]: [requestBuilder]

handleProduceResponse

void handleProduceResponse(
  ClientResponse response,
  Map<TopicPartition, ProducerBatch> batches,
  long now)

maybeSendAndPollTransactionalRequest

boolean maybeSendAndPollTransactionalRequest()

running Flag

Sender runs as long as the running internal flag is on.

The running flag is on from when Sender is created until requested to initiateClose.

initiateClose

void initiateClose()

initiateClose requests the RecordAccumulator to close and turns the running flag off.

In the end, initiateClose wakes up the KafkaClient.

initiateClose is used when:

Waking Up

void wakeup()

wakeup requests the KafkaClient to wakeup.

wakeup is used when:

Logging

Enable ALL logging level for org.apache.kafka.clients.producer.internals.Sender logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.clients.producer.internals.Sender=ALL

Refer to Logging.