Skip to content

KafkaConfig

KafkaConfig is configuration properties of a Kafka broker.

KafkaConfig is a AbstractConfig.

Accessing KafkaConfig

import kafka.server.KafkaConfig
import java.util.Properties
val props = new Properties()
props.put("zookeeper.connect", ":2181") // a required property
val config = KafkaConfig.fromProps(props, doLog = true)
assert(config.uncleanLeaderElectionEnable == false)

authorizer.class.name

The fully-qualified name of a class that implements Authorizer interface, which is used by the broker for request authorization.

Default: empty

Use KafkaConfig.authorizerClassName to access the current value.

Used when:

auto.leader.rebalance.enable

Enables auto partition leader balancing. A background thread checks the distribution of partition leaders at regular intervals (leader.imbalance.check.interval.seconds). If the leader imbalance exceeds leader.imbalance.per.broker.percentage, leader rebalance to the preferred leader for partitions is triggered.

Default: true

Importance: High

Used when:

broker.id

The broker ID of this Kafka server.

Default: -1

If unset or negative, a unique broker id will be generated (when KafkaServer is requested to start up).

To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids start from reserved.broker.max.id + 1

Use KafkaConfig.brokerId to access the current value.


import kafka.server.KafkaConfig
// For some reason zookeeper.connect is required?!
val m = Map(
  "zookeeper.connect" -> "xxx"
)
val props = new java.util.Properties()
import scala.jdk.CollectionConverters._
props.putAll(m.asJava)
val config = KafkaConfig.fromProps(props)
assert(config.brokerId == -1)

broker.id.generation.enable

Enables broker id generation on a server. When enabled, reserved.broker.max.id should be reviewed.

Default: true

Use brokerIdGenerationEnable to access the current value.

default.replication.factor

inter.broker.protocol.version

Specify which version of the inter-broker protocol to use. Typically bumped up after all brokers were upgraded to a new version.

Default: The latest version of ApiVersion of the broker

log.cleaner.enable

Enables LogCleaner

Default: true

  • Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic
  • If disabled those topics will not be compacted and continually grow in size.

Used when:

log.cleaner.threads

The number of background threads to use for log cleaning (by LogCleaner)

Default: 1

Must be at least 0

Reconfigurable Config

Used when:

log.cleaner.backoff.ms

How long to pause a CleanerThread (until next log cleaning attempt) when there are no logs to clean

Default: 15 * 1000

Must be at least 0

Reconfigurable Config

Used when:

metadata.log.dir

The directory of the metadata log of a Kafka cluster in KRaft mode.

Unless specified, the metadata log is placed in the first log directory from log.dirs.

Default: (unspecified)

Available as metadataLogDir

metadata.log.max.record.bytes.between.snapshots

The maximum number of bytes in the log between the latest snapshot and the high-watermark needed before generating a new snapshot.

To generate snapshots based on the time elapsed use metadata.log.max.snapshot.interval.ms configuration.

The Kafka node will generate a snapshot when either the maximum time interval is reached or the maximum bytes limit is reached.

Default: 20 * 1024 * 1024

At least 1

Priority: HIGH

Available as KafkaConfig.metadataSnapshotMaxNewRecordBytes

Used when:

  • SharedServer is requested to start

metadata.log.max.snapshot.interval.ms

The maximum number of milliseconds to wait to generate a snapshot if there are committed records in the log that are not included in the latest snapshot.

A value of zero disables time-based snapshot generation.

To generate snapshots based on the number of metadata bytes use metadata.log.max.record.bytes.between.snapshots configuration.

The Kafka node will generate a snapshot when either the maximum time interval is reached or the maximum bytes limit is reached.

Default: 1 hour

At least 0

Priority: HIGH

Available as KafkaConfig.metadataSnapshotMaxIntervalMs

Used when:

  • SharedServer is requested to start

num.partitions

offsets.topic.num.partitions

The number of partitions for __consumer_offsets offset commit topic (should not change after deployment)

For every partition there is a GroupCoordinator elected to handle consumer groups that are "assigned" to this partition.

Default: 50

Must be at least 1

Use KafkaConfig.offsetsTopicPartitions to access the current value.

Used when:

offsets.topic.replication.factor

controller.quorum.voters

controller.quorum.voters

process.roles

A comma-separated list of the roles that this Kafka server plays in a Kafka cluster:

Supported values:

  • broker
  • controller
  • broker,controller

Default: (empty)

  1. When empty, the process requires Zookeeper (runs with Zookeeper).
  2. Only applicable for clusters in KRaft (Kafka Raft) mode
  3. If used, controller.quorum.voters must contain a parseable set of voters
  4. advertised.listeners config must not contain KRaft controller listeners from controller.listener.names when process.roles contains broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers
  5. If process.roles contains controller role, the node.id must be included in the set of voters controller.quorum.voters
  6. If process.roles contains just the broker role, the node.id must not be included in the set of voters controller.quorum.voters
  7. If controller.listener.names has multiple entries; only the first will be used when process.roles is broker
  8. The advertised listeners (advertised.listeners or listeners) config must only contain KRaft controller listeners from controller.listener.names when process.roles is controller

request.timeout.ms

request.timeout.ms

transaction.abort.timed.out.transaction.cleanup.interval.ms

transactional.id.expiration.ms

transaction.max.timeout.ms

transaction.remove.expired.transaction.cleanup.interval.ms

transaction.state.log.num.partitions

The number of partitions for the transaction topic

Default: 50

Must be at least 1

transaction.state.log.replication.factor

transaction.state.log.segment.bytes

transaction.state.log.load.buffer.size

transaction.state.log.min.isr

unclean.leader.election.enable

Enables replicas not in the ISR to be elected as leaders as a last resort, even though it is not guaranteed to have every committed message (and may even result in data loss).

It is to support use cases where uptime and availability are preferable over consistency and allow non-in-sync replicas to become partition leaders.

Default: false (disabled)

Unclean leader election is automatically enabled by the controller when this config is dynamically updated by using per-topic config override.

Use KafkaConfig.uncleanLeaderElectionEnable to access the current value.

Per-topic configuration: unclean.leader.election.enable

Used when:

Utilities

DynamicBrokerConfig

dynamicConfig: DynamicBrokerConfig

KafkaConfig initializes dynamicConfig when created (based on the optionaldynamicConfigOverride).

The DynamicBrokerConfig is used when:

  • FIXME

interBrokerProtocolVersion

interBrokerProtocolVersion: ApiVersion

interBrokerProtocolVersion creates a ApiVersion for the inter.broker.protocol.version.

requiresZookeeper

requiresZookeeper: Boolean

requiresZookeeper is true when process.roles is empty.

Creating Instance

KafkaConfig takes the following to be created:

KafkaConfig is created when:

dynamicConfigOverride

KafkaConfig can be given a DynamicBrokerConfig when created.

Note

DynamicBrokerConfig seems never be given.

KafkaConfig creates a new DynamicBrokerConfig for dynamicConfig unless given.

Creating KafkaConfig Instance

fromProps

fromProps(
  props: Properties): KafkaConfig
fromProps(
  props: Properties,
  doLog: Boolean): KafkaConfig
fromProps(
  defaults: Properties,
  overrides: Properties): KafkaConfig
fromProps(
  defaults: Properties,
  overrides: Properties,
  doLog: Boolean): KafkaConfig

fromProps...FIXME


fromProps is used when:

apply

apply(
  props: Map[_, _],
  doLog: Boolean = true): KafkaConfig

apply...FIXME


apply seems to be used for testing only.

metadataLogDir

metadataLogDir: String

metadataLogDir is the value of metadata.log.dir, if defined, or the first directory from logDirs.


metadataLogDir is used when: