Sender¶
Sender
is a Runnable
(Java) that is executed as a separate thread alongside KafkaProducer to send records to a Kafka cluster.
Creating Instance¶
Sender
takes the following to be created:
-
LogContext
- KafkaClient
-
ProducerMetadata
- RecordAccumulator
-
guaranteeMessageOrder
flag - maxRequestSize
- acks
- retries
-
SenderMetricsRegistry
-
Time
- requestTimeoutMs
- retryBackoffMs
- TransactionManager
-
ApiVersions
Sender
is created along with KafkaProducer.
KafkaClient¶
Sender
is given a KafkaClient when created.
Running Thread¶
void run()
run
prints out the following DEBUG message to the logs:
Starting Kafka producer I/O thread.
run
runs once and repeats until the running flag is turned off.
Right after the running flag is off, run
prints out the following DEBUG message to the logs:
Beginning shutdown of Kafka producer I/O thread, sending remaining records.
run
...FIXME
In the end, run
prints out the following DEBUG message to the logs:
Shutdown of Kafka producer I/O thread has completed.
run
is part of the Runnable
(Java) abstraction.
runOnce¶
void runOnce()
If executed with a TransactionManager, runOnce
...FIXME
runOnce
sendProducerData.
runOnce
requests the KafkaClient to poll.
sendProducerData¶
long sendProducerData(
long now)
sendProducerData
requests the ProducerMetadata for the current cluster info
sendProducerData
requests the RecordAccumulator for the partitions with data ready to send.
sendProducerData
requests a metadata update when there are partitions with no leaders.
sendProducerData
removes nodes not ready to send to.
sendProducerData
requests the RecordAccumulator to drain (and create ProducerBatchs).
sendProducerData
registers the batches (in the inFlightBatches registry).
With guaranteeMessageOrder, sendProducerData
mutes all the partitions drained.
sendProducerData
requests the RecordAccumulator to resetNextBatchExpiryTime.
sendProducerData
requests the RecordAccumulator for the expired batches and adds all expired InflightBatches.
If there are any expired batches, sendProducerData
...FIXME
sendProducerData
requests the SenderMetrics
to updateProduceRequestMetrics
.
With at least one broker to send batches to, sendProducerData
prints out the following TRACE message to the logs:
Nodes with data ready to send: [readyNodes]
sendProducerData
sendProduceRequests.
sendProduceRequests¶
void sendProduceRequests(
Map<Integer, List<ProducerBatch>> collated,
long now)
For every pair of a broker node and an associated ProducerBatch (in the given collated
collection), sendProduceRequests
sendProduceRequest with the broker node, the acks, the requestTimeoutMs and the ProducerBatch
.
sendProduceRequest¶
void sendProduceRequest(
long now,
int destination,
short acks,
int timeout,
List<ProducerBatch> batches)
sendProduceRequest
creates a collection of ProducerBatches by TopicPartition
from the given batches
.
sendProduceRequest
requests the KafkaClient for a new ClientRequest (for the destination
broker) and to send it.
sendProduceRequest
registers a [handleProduceResponse] callback to invoke when a response arrives. sendProduceRequest
expects a response for all the acks
but 0
.
In the end, sendProduceRequest
prints out the following TRACE message to the logs:
Sent produce request to [nodeId]: [requestBuilder]
handleProduceResponse¶
void handleProduceResponse(
ClientResponse response,
Map<TopicPartition, ProducerBatch> batches,
long now)
maybeSendAndPollTransactionalRequest¶
boolean maybeSendAndPollTransactionalRequest()
running Flag¶
Sender
runs as long as the running
internal flag is on.
The running
flag is on from when Sender
is created until requested to initiateClose.
initiateClose¶
void initiateClose()
initiateClose
requests the RecordAccumulator to close and turns the running flag off.
In the end, initiateClose
wakes up the KafkaClient.
initiateClose
is used when:
KafkaProducer
is requested to closeSender
is requested to forceClose
Waking Up¶
void wakeup()
wakeup
requests the KafkaClient to wakeup.
wakeup
is used when:
KafkaProducer
is requested to initTransactions, sendOffsetsToTransaction, commitTransaction, abortTransaction, doSend, waitOnMetadata, flushSender
is requested to initiateClose
Logging¶
Enable ALL
logging level for org.apache.kafka.clients.producer.internals.Sender
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.org.apache.kafka.clients.producer.internals.Sender=ALL
Refer to Logging.