Skip to content

KafkaProducer

KafkaProducer<K, V> is a concrete Producer.

Creating Instance

KafkaProducer takes the following to be created:

configureTransactionState

TransactionManager configureTransactionState(
  ProducerConfig config,
  LogContext logContext)

configureTransactionState creates a new TransactionManager or returns null.


configureTransactionState checks whether the following configuration properties are specified in the given ProducerConfig:

  1. enable.idempotence
  2. transactional.id

With transactional.id specified, configureTransactionState turns the enable.idempotence on and prints out the following INFO message to the logs:

Overriding the default [enable.idempotence] to true since transactional.id is specified.

With idempotence enabled, configureTransactionState creates a TransactionManager with the values of the following configuration properties:

  1. transactional.id
  2. transaction.timeout.ms
  3. retry.backoff.ms

When the TransactionManager is transactional, configureTransactionState prints out the following INFO message to the logs:

Instantiated a transactional producer.

Otherwise, configureTransactionState prints out the following INFO message to the logs:

Instantiated an idempotent producer.

In the end, configureTransactionState returns the TransactionManager or null.

newSender

Sender newSender(
  LogContext logContext,
  KafkaClient kafkaClient,
  ProducerMetadata metadata)

newSender...FIXME

configureInflightRequests

int configureInflightRequests(
  ProducerConfig config)

configureInflightRequests gives the value of the max.in.flight.requests.per.connection (in the given ProducerConfig).

configureInflightRequests throws a ConfigException when the idempotence is enabled and the value of the max.in.flight.requests.per.connection is above 5:

Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.

configureAcks

short configureAcks(
  ProducerConfig config,
  Logger log)

configureAcks returns the value of acks configuration property (in the given ProducerConfig).

With idempotenceEnabled, configureAcks prints out the following INFO message to the logs when there is no acks configuration property defined:

Overriding the default [acks] to all since idempotence is enabled.

With idempotenceEnabled and the acks not -1, configureAcks throws a ConfigException:

Must set acks to all in order to use the idempotent producer.
Otherwise we cannot guarantee idempotence.

configureDeliveryTimeout

int configureDeliveryTimeout(
  ProducerConfig config,
  Logger log)

configureDeliveryTimeout...FIXME

TransactionManager

KafkaProducer may create a TransactionManager when created (with idempotenceEnabled).

TransactionManager is used to create the following:

KafkaProducer uses the TransactionManager for the following transactional methods:

throwIfNoTransactionManager

KafkaProducer throws an IllegalStateException for the transactional methods but TransactionManager is not configured.

Cannot use transactional methods without enabling transactions by setting the transactional.id configuration property

Sender Thread

KafkaProducer creates a Sender when created.

Sender is immediately started as a daemon thread with the following name (using the clientId):

kafka-producer-network-thread | [clientId]

KafkaProducer is actually considered open (and usable) as long as the Sender is running.

KafkaProducer simply requests the Sender to wake up for the following:

RecordAccumulator

KafkaProducer creates a RecordAccumulator when created.

This RecordAccumulator is used for the following:

max.block.ms

KafkaProducer uses max.block.ms configuration property.

Transactional Methods

abortTransaction

void abortTransaction()

abortTransaction prints out the following INFO message to the logs:

Aborting incomplete transaction

abortTransaction...FIXME

abortTransaction is part of the Producer abstraction.

beginTransaction

void beginTransaction()

beginTransaction requests the TransactionManager to beginTransaction.

beginTransaction is part of the Producer abstraction.

initTransactions

void initTransactions()

initTransactions requests the TransactionManager to initializeTransactions and requests the Sender to wakeup.

In the end, initTransactions waits max.block.ms until transaction initialization is completed (successfully or not).

initTransactions is part of the Producer abstraction.

sendOffsetsToTransaction

void sendOffsetsToTransaction(
  Map<TopicPartition, OffsetAndMetadata> offsets,
  ConsumerGroupMetadata groupMetadata)

sendOffsetsToTransaction requests the TransactionManager to sendOffsetsToTransaction and requests the Sender to wakeup.

In the end, sendOffsetsToTransaction waits max.block.ms for the send to be completed (successfully or not).

sendOffsetsToTransaction is part of the Producer abstraction.

Sending Record

Future<RecordMetadata> send(
  ProducerRecord<K, V> record) // (1)
Future<RecordMetadata> send(
  ProducerRecord<K, V> record,
  Callback callback)
  1. Uses uninitialized Callback (null)

send requests the interceptors to onSend with the given record (possibly modifying it) followed by doSend.

send is part of the Producer abstraction.

doSend

Future<RecordMetadata> doSend(
  ProducerRecord<K, V> record,
  Callback callback)

doSend waitOnMetadata for the topic and partition of the given record.

doSend requests the key Serializer to serialize the record (passing in the topic, the headers and the key of the record).

doSend requests the value Serializer to serialize the record (passing in the topic, the headers and the value of the record).

doSend determines the partition for the record.

doSend ensureValidRecordSize for the record (upper bound estimate).

doSend prints out the following TRACE message to the logs:

Attempting to append record [r] with callback [c] to topic [t] partition [p]

doSend requests the RecordAccumulator to append the record (with the abortOnNewBatch flag enabled).

When aborted for a new batch, doSend...FIXME (repeats the steps)...and prints out the following TRACE message to the logs:

Retrying append due to new batch creation for topic [t] partition [p].
The old partition was [prev]

When transactional, doSend requests the TransactionManager to maybeAddPartitionToTransaction.

For batchIsFull or a new batch created, doSend prints out the following TRACE message to the logs and requests the Sender to wakeup.

Waking up the sender since topic [t] partition [p] is either full or getting a new batch

partition

int partition(
  ProducerRecord<K, V> record,
  byte[] serializedKey,
  byte[] serializedValue,
  Cluster cluster)

partition is the partition (of the given ProducerRecord) if defined or requests the Partitioner for the partition.

Flushing

void flush()

flush requests the RecordAccumulator to beginFlush.

flush requests the Sender to wakeup.

flush requests the RecordAccumulator to awaitFlushCompletion.

flush is part of the Producer abstraction.

waitOnMetadata

ClusterAndWaitTime waitOnMetadata(
  String topic,
  Integer partition,
  long nowMs,
  long maxWaitMs)

waitOnMetadata requests the ProducerMetadata for the current cluster info.

waitOnMetadata...FIXME

waitOnMetadata is used when:

Demo

// Necessary imports
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer

// Creating a KafkaProducer
import java.util.Properties
val props = new Properties()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
val producer = new KafkaProducer[String, String](props)

// Creating a record to be sent
import org.apache.kafka.clients.producer.ProducerRecord
val r = new ProducerRecord[String, String]("0", "this is a message")

// Sending the record (with no Callback)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val metadataF: Future[RecordMetadata] = producer.send(r)

Logging

Enable ALL logging level for org.apache.kafka.clients.producer.KafkaProducer logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.clients.producer.KafkaProducer=ALL

Refer to Logging.

Back to top