LogManager¶

Creating Instance¶
LogManager takes the following to be created:
- Log directories
- Initial offline directories
-
ConfigRepository -
LogConfig -
CleanerConfig - num.recovery.threads.per.data.dir
- log.flush.scheduler.interval.ms
- log.flush.offset.checkpoint.interval.ms
- log.flush.start.offset.checkpoint.interval.ms
- log.retention.check.interval.ms
- transaction.max.timeout.ms
- transactional.id.expiration.ms
- inter.broker.protocol.version
-
Scheduler -
BrokerTopicStats -
LogDirFailureChannel -
Time -
keepPartitionMetadataFile
LogManager is created using apply factory method.
Creating LogManager¶
apply(
config: KafkaConfig,
initialOfflineDirs: Seq[String],
configRepository: ConfigRepository,
kafkaScheduler: KafkaScheduler,
time: Time,
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
keepPartitionMetadataFile: Boolean): LogManager
apply extracts log-related configuration properties (from the given KafkaConfig) and creates a LogConfig.
apply creates a LogCleaner.
In the end, apply creates a LogManager based on some configuration properties.
apply is used when:
KafkaMetricsGroup¶
LogManager is a KafkaMetricsGroup.
Current Logs¶
currentLogs: Pool[TopicPartition, UnifiedLog]
LogManager defines currentLogs internal registry of UnifiedLogs per TopicPartition.
LogManager uses the currentLogs registry when:
- startupWithConfigOverrides (to create a LogCleaner when enabled)
- handleLogDirFailure
- loadLog
- truncateTo
- truncateFullyAndStartAt
- getLog
- getOrCreateLog
- and many more
Looking Up Log¶
getLog(
topicPartition: TopicPartition,
isFuture: Boolean = false): Option[UnifiedLog]
With the input isFuture enabled, getLog uses the futureLogs registry to look up the UnifiedLog for the input TopicPartition (if available). Otherwise, getLog uses the currentLogs registry.
getLog is used when:
Partitionis requested to topicId and getOffsetByTimestampLogManageris requested to maybeUpdatePreferredLogDir, getOrCreateLog, asyncDeleteReplicaManageris requested to getLog, maybeAddLogDirFetchers
getOrCreateLog¶
getOrCreateLog(
topicPartition: TopicPartition,
isNew: Boolean = false,
isFuture: Boolean = false,
topicId: Option[Uuid]): UnifiedLog
getOrCreateLog...FIXME
getOrCreateLog is used when:
Partitionis requested to createLog
startup¶
startup(
topicNames: Set[String]): Unit
startup startupWithConfigOverrides with the currentDefaultConfig and fetchTopicConfigOverrides.
startup is used when:
KafkaServeris requested to startupBrokerMetadataPublisheris requested to initializeManagers
startupWithConfigOverrides¶
startupWithConfigOverrides(
defaultConfig: LogConfig,
topicConfigOverrides: Map[String, LogConfig]): Unit
startupWithConfigOverrides...FIXME
loadLogs¶
loadLogs(
defaultConfig: LogConfig,
topicConfigOverrides: Map[String, LogConfig]): Unit
loadLogs...FIXME
loadLog¶
loadLog(
logDir: File,
hadCleanShutdown: Boolean,
recoveryPoints: Map[TopicPartition, Long],
logStartOffsets: Map[TopicPartition, Long],
defaultConfig: LogConfig,
topicConfigOverrides: Map[String, LogConfig],
numRemainingSegments: ConcurrentMap[String, Int]): UnifiedLog
loadLog...FIXME
Logging¶
Enable ALL logging level for kafka.log.LogManager logger to see what happens inside.
Add the following line to config/log4j.properties:
log4j.logger.kafka.log.LogManager=ALL
Refer to Logging.