KafkaConfig¶
KafkaConfig is configuration properties of a Kafka broker.
KafkaConfig is a AbstractConfig.
Accessing KafkaConfig¶
import kafka.server.KafkaConfig
import java.util.Properties
val props = new Properties()
props.put("zookeeper.connect", ":2181") // a required property
val config = KafkaConfig.fromProps(props, doLog = true)
assert(config.uncleanLeaderElectionEnable == false)
The fully-qualified name of a class that implements Authorizer interface, which is used by the broker for request authorization.
Default: empty
Use KafkaConfig.authorizerClassName to access the current value.
Used when:
BrokerServeris requested to start upControllerServeris requested to start upKafkaServeris requested to start up
auto.leader.rebalance.enable¶
Enables auto partition leader balancing. A background thread checks the distribution of partition leaders at regular intervals (leader.imbalance.check.interval.seconds). If the leader imbalance exceeds leader.imbalance.per.broker.percentage, leader rebalance to the preferred leader for partitions is triggered.
Default: true
Importance: High
Used when:
KafkaControlleris requested to onControllerFailoverControllerServeris requested to startup
broker.id¶
The broker ID of this Kafka server.
Default: -1
If unset or negative, a unique broker id will be generated (when KafkaServer is requested to start up).
To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids start from reserved.broker.max.id + 1
Use KafkaConfig.brokerId to access the current value.
import kafka.server.KafkaConfig
// For some reason zookeeper.connect is required?!
val m = Map(
"zookeeper.connect" -> "xxx"
)
val props = new java.util.Properties()
import scala.jdk.CollectionConverters._
props.putAll(m.asJava)
val config = KafkaConfig.fromProps(props)
assert(config.brokerId == -1)
broker.id.generation.enable¶
Enables broker id generation on a server. When enabled, reserved.broker.max.id should be reviewed.
Default: true
Use brokerIdGenerationEnable to access the current value.
default.replication.factor¶
inter.broker.protocol.version¶
Specify which version of the inter-broker protocol to use. Typically bumped up after all brokers were upgraded to a new version.
Default: The latest version of ApiVersion of the broker
log.cleaner.enable¶
Enables LogCleaner
Default: true
- Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic
- If disabled those topics will not be compacted and continually grow in size.
Used when:
CleanerConfigis created
log.cleaner.threads¶
The number of background threads to use for log cleaning (by LogCleaner)
Default: 1
Must be at least 0
Used when:
CleanerConfigis created
log.cleaner.backoff.ms¶
How long to pause a CleanerThread (until next log cleaning attempt) when there are no logs to clean
Default: 15 * 1000
Must be at least 0
Used when:
CleanerConfigis created
metadata.log.dir¶
The directory of the metadata log of a Kafka cluster in KRaft mode.
Unless specified, the metadata log is placed in the first log directory from log.dirs.
Default: (unspecified)
Available as metadataLogDir
metadata.log.max.record.bytes.between.snapshots¶
The maximum number of bytes in the log between the latest snapshot and the high-watermark needed before generating a new snapshot.
To generate snapshots based on the time elapsed use metadata.log.max.snapshot.interval.ms configuration.
The Kafka node will generate a snapshot when either the maximum time interval is reached or the maximum bytes limit is reached.
Default: 20 * 1024 * 1024
At least 1
Priority: HIGH
Available as KafkaConfig.metadataSnapshotMaxNewRecordBytes
Used when:
SharedServeris requested to start
metadata.log.max.snapshot.interval.ms¶
The maximum number of milliseconds to wait to generate a snapshot if there are committed records in the log that are not included in the latest snapshot.
A value of zero disables time-based snapshot generation.
To generate snapshots based on the number of metadata bytes use metadata.log.max.record.bytes.between.snapshots configuration.
The Kafka node will generate a snapshot when either the maximum time interval is reached or the maximum bytes limit is reached.
Default: 1 hour
At least 0
Priority: HIGH
Available as KafkaConfig.metadataSnapshotMaxIntervalMs
Used when:
SharedServeris requested to start
num.partitions¶
offsets.topic.num.partitions¶
The number of partitions for __consumer_offsets offset commit topic (should not change after deployment)
For every partition there is a GroupCoordinator elected to handle consumer groups that are "assigned" to this partition.
Default: 50
Must be at least 1
Use KafkaConfig.offsetsTopicPartitions to access the current value.
Used when:
GroupCoordinatoris requested to create an OffsetConfigDefaultAutoTopicCreationManageris requested to creatableTopicKafkaServeris requested to start up (and starts up the GroupCoordinator)BrokerMetadataPublisheris requested toinitializeManagers(and starts up theGroupCoordinator)
offsets.topic.replication.factor¶
controller.quorum.voters¶
process.roles¶
A comma-separated list of the roles that this Kafka server plays in a Kafka cluster:
Supported values:
brokercontrollerbroker,controller
Default: (empty)
- When empty, the process requires Zookeeper (runs with Zookeeper).
- Only applicable for clusters in KRaft (Kafka Raft) mode
- If used, controller.quorum.voters must contain a parseable set of voters
- advertised.listeners config must not contain KRaft controller listeners from controller.listener.names when
process.rolescontainsbrokerrole because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers - If
process.rolescontainscontrollerrole, the node.id must be included in the set of voters controller.quorum.voters - If
process.rolescontains just thebrokerrole, the node.id must not be included in the set of voters controller.quorum.voters - If controller.listener.names has multiple entries; only the first will be used when
process.rolesisbroker - The advertised listeners (advertised.listeners or listeners) config must only contain KRaft controller listeners from controller.listener.names when
process.rolesiscontroller
request.timeout.ms¶
transaction.abort.timed.out.transaction.cleanup.interval.ms¶
transactional.id.expiration.ms¶
transaction.max.timeout.ms¶
transaction.remove.expired.transaction.cleanup.interval.ms¶
transaction.state.log.num.partitions¶
The number of partitions for the transaction topic
Default: 50
Must be at least 1
transaction.state.log.replication.factor¶
transaction.state.log.segment.bytes¶
transaction.state.log.load.buffer.size¶
transaction.state.log.min.isr¶
unclean.leader.election.enable¶
Enables replicas not in the ISR to be elected as leaders as a last resort, even though it is not guaranteed to have every committed message (and may even result in data loss).
It is to support use cases where uptime and availability are preferable over consistency and allow non-in-sync replicas to become partition leaders.
Default: false (disabled)
Unclean leader election is automatically enabled by the controller when this config is dynamically updated by using per-topic config override.
Use KafkaConfig.uncleanLeaderElectionEnable to access the current value.
Per-topic configuration: unclean.leader.election.enable
Used when:
TopicConfigHandleris requested to processConfigChanges (to enableTopicUncleanLeaderElection on an active controller)
Utilities¶
DynamicBrokerConfig¶
dynamicConfig: DynamicBrokerConfig
KafkaConfig initializes dynamicConfig when created (based on the optionaldynamicConfigOverride).
The DynamicBrokerConfig is used when:
- FIXME
interBrokerProtocolVersion¶
interBrokerProtocolVersion: ApiVersion
interBrokerProtocolVersion creates a ApiVersion for the inter.broker.protocol.version.
requiresZookeeper¶
requiresZookeeper: Boolean
requiresZookeeper is true when process.roles is empty.
Creating Instance¶
KafkaConfig takes the following to be created:
-
doLogflag - Properties
- DynamicBrokerConfig
KafkaConfig is created when:
DynamicBrokerConfigis requested to initialize, reloadUpdatedFilesWithoutConfigChange, processReconfigurationKafkaConfigis requested to fromProps, apply
dynamicConfigOverride¶
KafkaConfig can be given a DynamicBrokerConfig when created.
Note
DynamicBrokerConfig seems never be given.
KafkaConfig creates a new DynamicBrokerConfig for dynamicConfig unless given.
Creating KafkaConfig Instance¶
fromProps¶
fromProps(
props: Properties): KafkaConfig
fromProps(
props: Properties,
doLog: Boolean): KafkaConfig
fromProps(
defaults: Properties,
overrides: Properties): KafkaConfig
fromProps(
defaults: Properties,
overrides: Properties,
doLog: Boolean): KafkaConfig
fromProps...FIXME
fromProps is used when:
Kafkais requested to build a ServerAclAuthorizeris requested to configure
apply¶
apply(
props: Map[_, _],
doLog: Boolean = true): KafkaConfig
apply...FIXME
apply seems to be used for testing only.
metadataLogDir¶
metadataLogDir: String
metadataLogDir is the value of metadata.log.dir, if defined, or the first directory from logDirs.
metadataLogDir is used when:
KafkaRaftManageris created and requested to createDataDirKafkaRaftServeris requested to initializeLogDirsStorageToolis requested to configToLogDirectories