KafkaConfig¶
KafkaConfig
is configuration properties of a Kafka broker.
KafkaConfig
is a AbstractConfig.
Accessing KafkaConfig¶
import kafka.server.KafkaConfig
import java.util.Properties
val props = new Properties()
props.put("zookeeper.connect", ":2181") // a required property
val config = KafkaConfig.fromProps(props, doLog = true)
assert(config.uncleanLeaderElectionEnable == false)
The fully-qualified name of a class that implements Authorizer interface, which is used by the broker for request authorization.
Default: empty
Use KafkaConfig.authorizerClassName
to access the current value.
Used when:
BrokerServer
is requested to start upControllerServer
is requested to start upKafkaServer
is requested to start up
auto.leader.rebalance.enable¶
Enables auto partition leader balancing. A background thread checks the distribution of partition leaders at regular intervals (leader.imbalance.check.interval.seconds). If the leader imbalance exceeds leader.imbalance.per.broker.percentage, leader rebalance to the preferred leader for partitions is triggered.
Default: true
Importance: High
Used when:
KafkaController
is requested to onControllerFailoverControllerServer
is requested to startup
broker.id¶
The broker ID of this Kafka server.
Default: -1
If unset or negative, a unique broker id will be generated (when KafkaServer
is requested to start up).
To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids start from reserved.broker.max.id + 1
Use KafkaConfig.brokerId
to access the current value.
import kafka.server.KafkaConfig
// For some reason zookeeper.connect is required?!
val m = Map(
"zookeeper.connect" -> "xxx"
)
val props = new java.util.Properties()
import scala.jdk.CollectionConverters._
props.putAll(m.asJava)
val config = KafkaConfig.fromProps(props)
assert(config.brokerId == -1)
broker.id.generation.enable¶
Enables broker id generation on a server. When enabled, reserved.broker.max.id should be reviewed.
Default: true
Use brokerIdGenerationEnable
to access the current value.
default.replication.factor¶
inter.broker.protocol.version¶
Specify which version of the inter-broker protocol to use. Typically bumped up after all brokers were upgraded to a new version.
Default: The latest version of ApiVersion
of the broker
log.cleaner.enable¶
Enables LogCleaner
Default: true
- Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic
- If disabled those topics will not be compacted and continually grow in size.
Used when:
CleanerConfig
is created
log.cleaner.threads¶
The number of background threads to use for log cleaning (by LogCleaner)
Default: 1
Must be at least 0
Used when:
CleanerConfig
is created
log.cleaner.backoff.ms¶
How long to pause a CleanerThread (until next log cleaning attempt) when there are no logs to clean
Default: 15 * 1000
Must be at least 0
Used when:
CleanerConfig
is created
metadata.log.dir¶
The directory of the metadata log of a Kafka cluster in KRaft mode.
Unless specified, the metadata log is placed in the first log directory from log.dirs.
Default: (unspecified)
Available as metadataLogDir
metadata.log.max.record.bytes.between.snapshots¶
The maximum number of bytes in the log between the latest snapshot and the high-watermark needed before generating a new snapshot.
To generate snapshots based on the time elapsed use metadata.log.max.snapshot.interval.ms configuration.
The Kafka node will generate a snapshot when either the maximum time interval is reached or the maximum bytes limit is reached.
Default: 20 * 1024 * 1024
At least 1
Priority: HIGH
Available as KafkaConfig.metadataSnapshotMaxNewRecordBytes
Used when:
SharedServer
is requested to start
metadata.log.max.snapshot.interval.ms¶
The maximum number of milliseconds to wait to generate a snapshot if there are committed records in the log that are not included in the latest snapshot.
A value of zero disables time-based snapshot generation.
To generate snapshots based on the number of metadata bytes use metadata.log.max.record.bytes.between.snapshots configuration.
The Kafka node will generate a snapshot when either the maximum time interval is reached or the maximum bytes limit is reached.
Default: 1
hour
At least 0
Priority: HIGH
Available as KafkaConfig.metadataSnapshotMaxIntervalMs
Used when:
SharedServer
is requested to start
num.partitions¶
offsets.topic.num.partitions¶
The number of partitions for __consumer_offsets
offset commit topic (should not change after deployment)
For every partition there is a GroupCoordinator
elected to handle consumer groups that are "assigned" to this partition.
Default: 50
Must be at least 1
Use KafkaConfig.offsetsTopicPartitions
to access the current value.
Used when:
GroupCoordinator
is requested to create an OffsetConfigDefaultAutoTopicCreationManager
is requested to creatableTopicKafkaServer
is requested to start up (and starts up the GroupCoordinator)BrokerMetadataPublisher
is requested toinitializeManagers
(and starts up theGroupCoordinator
)
offsets.topic.replication.factor¶
controller.quorum.voters¶
process.roles¶
A comma-separated list of the roles that this Kafka server plays in a Kafka cluster:
Supported values:
broker
controller
broker,controller
Default: (empty)
- When empty, the process requires Zookeeper (runs with Zookeeper).
- Only applicable for clusters in KRaft (Kafka Raft) mode
- If used, controller.quorum.voters must contain a parseable set of voters
- advertised.listeners config must not contain KRaft controller listeners from controller.listener.names when
process.roles
containsbroker
role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers - If
process.roles
containscontroller
role, the node.id must be included in the set of voters controller.quorum.voters - If
process.roles
contains just thebroker
role, the node.id must not be included in the set of voters controller.quorum.voters - If controller.listener.names has multiple entries; only the first will be used when
process.roles
isbroker
- The advertised listeners (advertised.listeners or listeners) config must only contain KRaft controller listeners from controller.listener.names when
process.roles
iscontroller
request.timeout.ms¶
transaction.abort.timed.out.transaction.cleanup.interval.ms¶
transactional.id.expiration.ms¶
transaction.max.timeout.ms¶
transaction.remove.expired.transaction.cleanup.interval.ms¶
transaction.state.log.num.partitions¶
The number of partitions for the transaction topic
Default: 50
Must be at least 1
transaction.state.log.replication.factor¶
transaction.state.log.segment.bytes¶
transaction.state.log.load.buffer.size¶
transaction.state.log.min.isr¶
unclean.leader.election.enable¶
Enables replicas not in the ISR to be elected as leaders as a last resort, even though it is not guaranteed to have every committed message (and may even result in data loss).
It is to support use cases where uptime and availability are preferable over consistency and allow non-in-sync replicas to become partition leaders.
Default: false
(disabled)
Unclean leader election is automatically enabled by the controller when this config is dynamically updated by using per-topic config override.
Use KafkaConfig.uncleanLeaderElectionEnable
to access the current value.
Per-topic configuration: unclean.leader.election.enable
Used when:
TopicConfigHandler
is requested to processConfigChanges (to enableTopicUncleanLeaderElection on an active controller)
Utilities¶
DynamicBrokerConfig¶
dynamicConfig: DynamicBrokerConfig
KafkaConfig
initializes dynamicConfig
when created (based on the optionaldynamicConfigOverride).
The DynamicBrokerConfig
is used when:
- FIXME
interBrokerProtocolVersion¶
interBrokerProtocolVersion: ApiVersion
interBrokerProtocolVersion
creates a ApiVersion
for the inter.broker.protocol.version.
requiresZookeeper¶
requiresZookeeper: Boolean
requiresZookeeper
is true
when process.roles is empty.
Creating Instance¶
KafkaConfig
takes the following to be created:
-
doLog
flag - Properties
- DynamicBrokerConfig
KafkaConfig
is created when:
DynamicBrokerConfig
is requested to initialize, reloadUpdatedFilesWithoutConfigChange, processReconfigurationKafkaConfig
is requested to fromProps, apply
dynamicConfigOverride¶
KafkaConfig
can be given a DynamicBrokerConfig when created.
Note
DynamicBrokerConfig
seems never be given.
KafkaConfig
creates a new DynamicBrokerConfig
for dynamicConfig unless given.
Creating KafkaConfig Instance¶
fromProps¶
fromProps(
props: Properties): KafkaConfig
fromProps(
props: Properties,
doLog: Boolean): KafkaConfig
fromProps(
defaults: Properties,
overrides: Properties): KafkaConfig
fromProps(
defaults: Properties,
overrides: Properties,
doLog: Boolean): KafkaConfig
fromProps
...FIXME
fromProps
is used when:
Kafka
is requested to build a ServerAclAuthorizer
is requested to configure
apply¶
apply(
props: Map[_, _],
doLog: Boolean = true): KafkaConfig
apply
...FIXME
apply
seems to be used for testing only.
metadataLogDir¶
metadataLogDir: String
metadataLogDir
is the value of metadata.log.dir, if defined, or the first directory from logDirs.
metadataLogDir
is used when:
KafkaRaftManager
is created and requested to createDataDirKafkaRaftServer
is requested to initializeLogDirsStorageTool
is requested to configToLogDirectories