KafkaProducer¶
KafkaProducer<K, V>
is a concrete Producer.
Creating Instance¶
KafkaProducer
takes the following to be created:
- ProducerConfig
- Key
Serializer<K>
- Value
Serializer<V>
- ProducerMetadata
- KafkaClient
-
ProducerInterceptor<K, V>
s - Time
configureTransactionState¶
TransactionManager configureTransactionState(
ProducerConfig config,
LogContext logContext)
configureTransactionState
creates a new TransactionManager or returns null
.
configureTransactionState
checks whether the following configuration properties are specified in the given ProducerConfig:
With transactional.id specified, configureTransactionState
turns the enable.idempotence on and prints out the following INFO message to the logs:
Overriding the default [enable.idempotence] to true since transactional.id is specified.
With idempotence enabled, configureTransactionState
creates a TransactionManager with the values of the following configuration properties:
When the TransactionManager
is transactional, configureTransactionState
prints out the following INFO message to the logs:
Instantiated a transactional producer.
Otherwise, configureTransactionState
prints out the following INFO message to the logs:
Instantiated an idempotent producer.
In the end, configureTransactionState
returns the TransactionManager
or null
.
newSender¶
Sender newSender(
LogContext logContext,
KafkaClient kafkaClient,
ProducerMetadata metadata)
newSender
...FIXME
configureInflightRequests¶
int configureInflightRequests(
ProducerConfig config)
configureInflightRequests
gives the value of the max.in.flight.requests.per.connection (in the given ProducerConfig
).
configureInflightRequests
throws a ConfigException
when the idempotence is enabled and the value of the max.in.flight.requests.per.connection is above 5:
Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.
configureAcks¶
short configureAcks(
ProducerConfig config,
Logger log)
configureAcks
returns the value of acks configuration property (in the given ProducerConfig).
With idempotenceEnabled, configureAcks
prints out the following INFO message to the logs when there is no acks configuration property defined:
Overriding the default [acks] to all since idempotence is enabled.
With idempotenceEnabled and the acks
not -1
, configureAcks
throws a ConfigException
:
Must set acks to all in order to use the idempotent producer.
Otherwise we cannot guarantee idempotence.
configureDeliveryTimeout¶
int configureDeliveryTimeout(
ProducerConfig config,
Logger log)
configureDeliveryTimeout
...FIXME
TransactionManager¶
KafkaProducer
may create a TransactionManager when created (with idempotenceEnabled).
TransactionManager
is used to create the following:
KafkaProducer
uses the TransactionManager
for the following transactional methods:
- abortTransaction
- beginTransaction
- commitTransaction
- initTransactions
- sendOffsetsToTransaction
- doSend
throwIfNoTransactionManager¶
KafkaProducer
throws an IllegalStateException
for the transactional methods but TransactionManager is not configured.
Cannot use transactional methods without enabling transactions by setting the transactional.id configuration property
Sender Thread¶
KafkaProducer
creates a Sender when created.
Sender
is immediately started as a daemon thread with the following name (using the clientId):
kafka-producer-network-thread | [clientId]
KafkaProducer
is actually considered open (and usable) as long as the Sender
is running.
KafkaProducer
simply requests the Sender
to wake up for the following:
- initTransactions
- sendOffsetsToTransaction
- commitTransaction
- abortTransaction
- doSend
- waitOnMetadata
- flush
RecordAccumulator¶
KafkaProducer
creates a RecordAccumulator when created.
This RecordAccumulator
is used for the following:
- Create a Sender
- append when doSend
- beginFlush when flush
max.block.ms¶
KafkaProducer
uses max.block.ms configuration property.
Transactional Methods¶
abortTransaction¶
void abortTransaction()
abortTransaction
prints out the following INFO message to the logs:
Aborting incomplete transaction
abortTransaction
...FIXME
abortTransaction
is part of the Producer abstraction.
beginTransaction¶
void beginTransaction()
beginTransaction
requests the TransactionManager to beginTransaction.
beginTransaction
is part of the Producer abstraction.
initTransactions¶
void initTransactions()
initTransactions
requests the TransactionManager to initializeTransactions and requests the Sender to wakeup.
In the end, initTransactions
waits max.block.ms until transaction initialization is completed (successfully or not).
initTransactions
is part of the Producer abstraction.
sendOffsetsToTransaction¶
void sendOffsetsToTransaction(
Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata)
sendOffsetsToTransaction
requests the TransactionManager to sendOffsetsToTransaction and requests the Sender to wakeup.
In the end, sendOffsetsToTransaction
waits max.block.ms for the send to be completed (successfully or not).
sendOffsetsToTransaction
is part of the Producer abstraction.
Sending Record¶
Future<RecordMetadata> send(
ProducerRecord<K, V> record) // (1)
Future<RecordMetadata> send(
ProducerRecord<K, V> record,
Callback callback)
- Uses uninitialized Callback (
null
)
send
requests the interceptors to onSend with the given record
(possibly modifying it) followed by doSend.
send
is part of the Producer abstraction.
doSend¶
Future<RecordMetadata> doSend(
ProducerRecord<K, V> record,
Callback callback)
doSend
waitOnMetadata for the topic and partition of the given record.
doSend
requests the key Serializer to serialize
the record (passing in the topic, the headers and the key of the record).
doSend
requests the value Serializer to serialize
the record (passing in the topic, the headers and the value of the record).
doSend
determines the partition for the record.
doSend
ensureValidRecordSize for the record (upper bound estimate).
doSend
prints out the following TRACE message to the logs:
Attempting to append record [r] with callback [c] to topic [t] partition [p]
doSend
requests the RecordAccumulator to append the record (with the abortOnNewBatch
flag enabled).
When aborted for a new batch, doSend
...FIXME (repeats the steps)...and prints out the following TRACE message to the logs:
Retrying append due to new batch creation for topic [t] partition [p].
The old partition was [prev]
When transactional, doSend
requests the TransactionManager to maybeAddPartitionToTransaction.
For batchIsFull
or a new batch created, doSend
prints out the following TRACE message to the logs and requests the Sender to wakeup.
Waking up the sender since topic [t] partition [p] is either full or getting a new batch
partition¶
int partition(
ProducerRecord<K, V> record,
byte[] serializedKey,
byte[] serializedValue,
Cluster cluster)
partition
is the partition
(of the given ProducerRecord
) if defined or requests the Partitioner for the partition.
Flushing¶
void flush()
flush
requests the RecordAccumulator to beginFlush.
flush
requests the Sender to wakeup.
flush
requests the RecordAccumulator to awaitFlushCompletion.
flush
is part of the Producer abstraction.
waitOnMetadata¶
ClusterAndWaitTime waitOnMetadata(
String topic,
Integer partition,
long nowMs,
long maxWaitMs)
waitOnMetadata
requests the ProducerMetadata for the current cluster info.
waitOnMetadata
...FIXME
waitOnMetadata
is used when:
KafkaProducer
is requested to doSend and partitionsFor
lingerMs¶
lingerMs(
ProducerConfig config)
lingerMs
provides an upper bound on linger.ms to be up to the maximum integer value.
lingerMs
is used when:
KafkaProducer
is created (to create the RecordAccumulator) and requested to configureDeliveryTimeout
Logging¶
Enable ALL
logging level for org.apache.kafka.clients.producer.KafkaProducer
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.org.apache.kafka.clients.producer.KafkaProducer=ALL
Refer to Logging.