ProducerConfig¶
acks¶
batch.size¶
The buffer size allocated for a partition. When records are received (which are smaller than this size) KafkaProducer will attempt to optimistically group them together until this size is reached.
Default: 16384
Must be at least 0. If 0
, KafkaProducer
will assume 1
anyway (to create the RecordAccumulator)
Related to:
- linger.ms
max-partition-memory-bytes
(ConsoleProducer
)
Used when:
KafkaProducer
is created (to create a RecordAccumulator and an accompanying BufferPool)KafkaLog4jAppender
is requested toactivateOptions
enable.idempotence¶
Default: false
Used when:
KafkaProducer
is requested to configureTransactionStateProducerConfig
is requested to maybeOverrideEnableIdempotence and idempotenceEnabled
linger.ms¶
A delay for how long a KafkaProducer should wait before sending out a record to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP.
This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up.
Default: 0
(no delay)
For example, 5
would have the effect of reducing the number of requests sent out but would add up to 5
ms of latency to records sent in the absence of load.
Kafka Streams
Kafka Streams prefers linger.ms
to be 100
.
Available as lingerMs
Correlated with:
max.block.ms¶
max.in.flight.requests.per.connection¶
The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
Default: 5
Must be at least 1
Related to:
Used when:
KafkaProducer
is requested to configureInflightRequests
partitioner.class¶
The class of the Partitioner for a KafkaProducer
Default: DefaultPartitioner
retries¶
retry.backoff.ms¶
transactional.id¶
The ID of a KafkaProducer for transactional delivery
Default: (undefined)
This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same transactional.id
have been completed prior to starting any new transactions.
With no transactional.id
, a producer is limited to idempotent delivery.
When configured, enable.idempotence is implied (and configured when KafkaProducer
is created).
With transactional.id
, KafkaProducer
uses a modified client.id (that includes the ID).
Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor.
transactional.id
is required for the transactional methods.
Used when:
- KafkaProducer prints out log messages (with the transactional ID included in the log prefix)
KafkaProducer
is created (and creates a TransactionManager)
transaction.state.log.replication.factor¶
transaction.timeout.ms¶
idempotenceEnabled¶
boolean idempotenceEnabled()
idempotenceEnabled
is enabled (true
) when one of the following holds:
- transactional.id is defined
- enable.idempotence is enabled
idempotenceEnabled
throws a ConfigException
when enable.idempotence is disabled but transactional.id is defined:
Cannot set a transactional.id without also enabling idempotence.
idempotenceEnabled
is used when:
KafkaProducer
is created (and requested to configureTransactionState, configureInflightRequests, configureAcks)ProducerConfig
is requested to maybeOverrideAcksAndRetries
postProcessParsedConfig¶
Map<String, Object> postProcessParsedConfig(
Map<String, Object> parsedValues)
postProcessParsedConfig
maybeOverrideEnableIdempotence. postProcessParsedConfig
maybeOverrideClientId. postProcessParsedConfig
maybeOverrideAcksAndRetries.
postProcessParsedConfig
is part of the AbstractConfig abstraction.
maybeOverrideClientId¶
maybeOverrideAcksAndRetries
overrides client.id configuration property unless already defined.
The new value uses transactional.id (if defined) or the next available ID with the producer-
prefix.
maybeOverrideAcksAndRetries¶
void maybeOverrideAcksAndRetries(
Map<String, Object> configs)
maybeOverrideAcksAndRetries
...FIXME
maybeOverrideEnableIdempotence¶
void maybeOverrideEnableIdempotence(
Map<String, Object> configs)
maybeOverrideEnableIdempotence
sets enable.idempotence configuration property to true
when transactional.id is defined with no enable.idempotence.