Sender¶
Sender is a Runnable (Java) that is executed as a separate thread alongside KafkaProducer to send records to a Kafka cluster.
Creating Instance¶
Sender takes the following to be created:
-
LogContext - KafkaClient
-
ProducerMetadata - RecordAccumulator
-
guaranteeMessageOrderflag - maxRequestSize
- acks
- retries
-
SenderMetricsRegistry -
Time - requestTimeoutMs
- retryBackoffMs
- TransactionManager
-
ApiVersions
Sender is created along with KafkaProducer.
KafkaClient¶
Sender is given a KafkaClient when created.
Running Thread¶
void run()
run prints out the following DEBUG message to the logs:
Starting Kafka producer I/O thread.
run runs once and repeats until the running flag is turned off.
Right after the running flag is off, run prints out the following DEBUG message to the logs:
Beginning shutdown of Kafka producer I/O thread, sending remaining records.
run...FIXME
In the end, run prints out the following DEBUG message to the logs:
Shutdown of Kafka producer I/O thread has completed.
run is part of the Runnable (Java) abstraction.
runOnce¶
void runOnce()
If executed with a TransactionManager, runOnce...FIXME
runOnce sendProducerData.
runOnce requests the KafkaClient to poll.
sendProducerData¶
long sendProducerData(
long now)
sendProducerData requests the ProducerMetadata for the current cluster info
sendProducerData requests the RecordAccumulator for the partitions with data ready to send.
sendProducerData requests a metadata update when there are partitions with no leaders.
sendProducerData removes nodes not ready to send to.
sendProducerData requests the RecordAccumulator to drain (and create ProducerBatchs).
sendProducerData registers the batches (in the inFlightBatches registry).
With guaranteeMessageOrder, sendProducerData mutes all the partitions drained.
sendProducerData requests the RecordAccumulator to resetNextBatchExpiryTime.
sendProducerData requests the RecordAccumulator for the expired batches and adds all expired InflightBatches.
If there are any expired batches, sendProducerData...FIXME
sendProducerData requests the SenderMetrics to updateProduceRequestMetrics.
With at least one broker to send batches to, sendProducerData prints out the following TRACE message to the logs:
Nodes with data ready to send: [readyNodes]
sendProducerData sendProduceRequests.
sendProduceRequests¶
void sendProduceRequests(
Map<Integer, List<ProducerBatch>> collated,
long now)
For every pair of a broker node and an associated ProducerBatch (in the given collated collection), sendProduceRequests sendProduceRequest with the broker node, the acks, the requestTimeoutMs and the ProducerBatch.
sendProduceRequest¶
void sendProduceRequest(
long now,
int destination,
short acks,
int timeout,
List<ProducerBatch> batches)
sendProduceRequest creates a collection of ProducerBatches by TopicPartition from the given batches.
sendProduceRequest requests the KafkaClient for a new ClientRequest (for the destination broker) and to send it.
sendProduceRequest registers a [handleProduceResponse] callback to invoke when a response arrives. sendProduceRequest expects a response for all the acks but 0.
In the end, sendProduceRequest prints out the following TRACE message to the logs:
Sent produce request to [nodeId]: [requestBuilder]
handleProduceResponse¶
void handleProduceResponse(
ClientResponse response,
Map<TopicPartition, ProducerBatch> batches,
long now)
maybeSendAndPollTransactionalRequest¶
boolean maybeSendAndPollTransactionalRequest()
running Flag¶
Sender runs as long as the running internal flag is on.
The running flag is on from when Sender is created until requested to initiateClose.
initiateClose¶
void initiateClose()
initiateClose requests the RecordAccumulator to close and turns the running flag off.
In the end, initiateClose wakes up the KafkaClient.
initiateClose is used when:
KafkaProduceris requested to closeSenderis requested to forceClose
Waking Up¶
void wakeup()
wakeup requests the KafkaClient to wakeup.
wakeup is used when:
KafkaProduceris requested to initTransactions, sendOffsetsToTransaction, commitTransaction, abortTransaction, doSend, waitOnMetadata, flushSenderis requested to initiateClose
Logging¶
Enable ALL logging level for org.apache.kafka.clients.producer.internals.Sender logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.org.apache.kafka.clients.producer.internals.Sender=ALL
Refer to Logging.