Skip to content

ProducerConfig

acks

batch.size

The buffer size allocated for a partition. When records are received (which are smaller than this size) KafkaProducer will attempt to optimistically group them together until this size is reached.

Default: 16384

Must be at least 0. If 0, KafkaProducer will assume 1 anyway (to create the RecordAccumulator)

Related to:

  • linger.ms
  • max-partition-memory-bytes (ConsoleProducer)

Used when:

enable.idempotence

Default: false

Used when:

linger.ms

A delay for how long a KafkaProducer should wait before sending out a record to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP.

Cambridge Dictionary

linger
verb

to take a long time to leave or disappear

This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up.

Default: 0 (no delay)

For example, 5 would have the effect of reducing the number of requests sent out but would add up to 5 ms of latency to records sent in the absence of load.

Kafka Streams

Kafka Streams prefers linger.ms to be 100.

Available as lingerMs

Correlated with:

max.block.ms

max.in.flight.requests.per.connection

The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).

Default: 5

Must be at least 1

Related to:

Used when:

partitioner.class

The class of the Partitioner for a KafkaProducer

Default: DefaultPartitioner

retries

retry.backoff.ms

retry.backoff.ms

transactional.id

The ID of a KafkaProducer for transactional delivery

Default: (undefined)

This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same transactional.id have been completed prior to starting any new transactions.

With no transactional.id, a producer is limited to idempotent delivery.

When configured, enable.idempotence is implied (and configured when KafkaProducer is created).

With transactional.id, KafkaProducer uses a modified client.id (that includes the ID).

Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor.

transactional.id is required for the transactional methods.

Used when:

transaction.state.log.replication.factor

transaction.timeout.ms

idempotenceEnabled

boolean idempotenceEnabled()

idempotenceEnabled is enabled (true) when one of the following holds:

  1. transactional.id is defined
  2. enable.idempotence is enabled

idempotenceEnabled throws a ConfigException when enable.idempotence is disabled but transactional.id is defined:

Cannot set a transactional.id without also enabling idempotence.

idempotenceEnabled is used when:

postProcessParsedConfig

Map<String, Object> postProcessParsedConfig(
  Map<String, Object> parsedValues)

postProcessParsedConfig maybeOverrideEnableIdempotence. postProcessParsedConfig maybeOverrideClientId. postProcessParsedConfig maybeOverrideAcksAndRetries.

postProcessParsedConfig is part of the AbstractConfig abstraction.

maybeOverrideClientId

maybeOverrideAcksAndRetries overrides client.id configuration property unless already defined.

The new value uses transactional.id (if defined) or the next available ID with the producer- prefix.

maybeOverrideAcksAndRetries

void maybeOverrideAcksAndRetries(
  Map<String, Object> configs)

maybeOverrideAcksAndRetries...FIXME

maybeOverrideEnableIdempotence

void maybeOverrideEnableIdempotence(
  Map<String, Object> configs)

maybeOverrideEnableIdempotence sets enable.idempotence configuration property to true when transactional.id is defined with no enable.idempotence.